jobqueue: improved performance of JobQueueGroup::getQueuesWithJobs()
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 4 Jul 2013 07:05:19 +0000 (00:05 -0700)
committerTim Starling <tstarling@wikimedia.org>
Tue, 1 Oct 2013 03:18:48 +0000 (03:18 +0000)
* Also added a JobQueueGroup::getQueueSizes() function.
  This function is now used by the SiteStats to make is
  meaningful for when queues are not in the DB. This number
  is already exposed via the API.

bug: 45072
bug: 50635
bug: 9518
Change-Id: I75c16ffa14c963e7f8fb7cb390e6cc4cde0a5804

includes/SiteStats.php
includes/job/JobQueue.php
includes/job/JobQueueDB.php
includes/job/JobQueueFederated.php
includes/job/JobQueueGroup.php
includes/job/JobQueueRedis.php

index 6e2359a..355993c 100644 (file)
@@ -189,7 +189,7 @@ class SiteStats {
        static function jobs() {
                if ( !isset( self::$jobs ) ) {
                        $dbr = wfGetDB( DB_SLAVE );
-                       self::$jobs = $dbr->estimateRowCount( 'job' );
+                       self::$jobs = array_sum( JobQueueGroup::singleton()->getQueueSizes() );
                        /* Zero rows still do single row read for row that doesn't exist, but people are annoyed by that */
                        if ( self::$jobs == 1 ) {
                                self::$jobs = 0;
index 3e94b13..81e7b73 100644 (file)
@@ -616,6 +616,63 @@ abstract class JobQueue {
                return new ArrayIterator( array() ); // not implemented
        }
 
+       /**
+        * Do not use this function outside of JobQueue/JobQueueGroup
+        *
+        * @return string
+        * @since 1.22
+        */
+       public function getCoalesceLocationInternal() {
+               return null;
+       }
+
+       /**
+        * Check whether each of the given queues are empty.
+        * This is used for batching checks for queues stored at the same place.
+        *
+        * @param array $types List of queues types
+        * @return array|null (list of non-empty queue types) or null if unsupported
+        * @throws MWException
+        * @since 1.22
+        */
+       final public function getSiblingQueuesWithJobs( array $types ) {
+               $section = new ProfileSection( __METHOD__ );
+               return $this->doGetSiblingQueuesWithJobs( $types );
+       }
+
+       /**
+        * @see JobQueue::getSiblingQueuesWithJobs()
+        * @param array $types List of queues types
+        * @return array|null (list of queue types) or null if unsupported
+        */
+       protected function doGetSiblingQueuesWithJobs( array $types ) {
+               return null; // not supported
+       }
+
+       /**
+        * Check the size of each of the given queues.
+        * For queues not served by the same store as this one, 0 is returned.
+        * This is used for batching checks for queues stored at the same place.
+        *
+        * @param array $types List of queues types
+        * @return array|null (job type => whether queue is empty) or null if unsupported
+        * @throws MWException
+        * @since 1.22
+        */
+       final public function getSiblingQueueSizes( array $types ) {
+               $section = new ProfileSection( __METHOD__ );
+               return $this->doGetSiblingQueueSizes( $types );
+       }
+
+       /**
+        * @see JobQueue::getSiblingQueuesSize()
+        * @param array $types List of queues types
+        * @return array|null (list of queue types) or null if unsupported
+        */
+       protected function doGetSiblingQueueSizes( array $types ) {
+               return null; // not supported
+       }
+
        /**
         * Call wfIncrStats() for the queue overall and for the queue type
         *
index 3fa0655..2052fc1 100644 (file)
@@ -604,6 +604,34 @@ class JobQueueDB extends JobQueue {
                }
        }
 
+       public function getCoalesceLocationInternal() {
+               return $this->cluster ? "DBCluster:{$this->cluster}" : "LBFactory:{$this->wiki}";
+       }
+
+       protected function doGetSiblingQueuesWithJobs( array $types ) {
+               list( $dbr, $scope ) = $this->getSlaveDB();
+               $res = $dbr->select( 'job', 'DISTINCT job_cmd',
+                       array( 'job_cmd' => $types ), __METHOD__ );
+
+               $types = array();
+               foreach ( $res as $row ) {
+                       $types[] = $row->job_cmd;
+               }
+               return $types;
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               list( $dbr, $scope ) = $this->getSlaveDB();
+               $res = $dbr->select( 'job', array( 'job_cmd', 'COUNT(*) AS count' ),
+                       array( 'job_cmd' => $types ), __METHOD__, array( 'GROUP BY' => 'job_cmd' ) );
+
+               $sizes = array();
+               foreach ( $res as $row ) {
+                       $sizes[$row->job_cmd] = (int)$row->count;
+               }
+               return $sizes;
+       }
+
        /**
         * Recycle or destroy any jobs that have been claimed for too long
         *
index 35b80ca..d788c98 100644 (file)
@@ -421,6 +421,38 @@ class JobQueueFederated extends JobQueue {
                return $iterator;
        }
 
+       public function getCoalesceLocationInternal() {
+               return "JobQueueFederated:wiki:" . $this->wiki;
+       }
+
+       protected function doGetSiblingQueuesWithJobs( array $types ) {
+               $result = array();
+               foreach ( $this->partitionQueues as $queue ) {
+                       $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
+                       if ( is_array( $nonEmpty ) ) {
+                               $result = array_merge( $result, $nonEmpty );
+                       } else {
+                               return null; // not supported on all partitions; bail
+                       }
+               }
+               return array_values( array_unique( $result ) );
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               $result = array();
+               foreach ( $this->partitionQueues as $queue ) {
+                       $sizes = $queue->doGetSiblingQueueSizes( $types );
+                       if ( is_array( $sizes ) ) {
+                               foreach ( $sizes as $type => $size ) {
+                                       $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
+                               }
+                       } else {
+                               return null; // not supported on all partitions; bail
+                       }
+               }
+               return $result;
+       }
+
        public function setTestingPrefix( $key ) {
                foreach ( $this->partitionQueues as $queue ) {
                        $queue->setTestingPrefix( $key );
index e483e05..a20ea4a 100644 (file)
@@ -36,6 +36,9 @@ class JobQueueGroup {
 
        protected $wiki; // string; wiki ID
 
+       /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
+       protected $coalescedQueues;
+
        const TYPE_DEFAULT = 1; // integer; jobs popped by default
        const TYPE_ANY = 2; // integer; any job
 
@@ -254,14 +257,71 @@ class JobQueueGroup {
         */
        public function getQueuesWithJobs() {
                $types = array();
-               foreach ( $this->getQueueTypes() as $type ) {
-                       if ( !$this->get( $type )->isEmpty() ) {
-                               $types[] = $type;
+               foreach ( $this->getCoalescedQueues() as $info ) {
+                       $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
+                       if ( is_array( $nonEmpty ) ) { // batching features supported
+                               $types = array_merge( $types, $nonEmpty );
+                       } else { // we have to go through the queues in the bucket one-by-one
+                               foreach ( $info['types'] as $type ) {
+                                       if ( !$this->get( $type )->isEmpty() ) {
+                                               $types[] = $type;
+                                       }
+                               }
                        }
                }
                return $types;
        }
 
+       /**
+        * Get the size of the queus for a list of job types
+        *
+        * @return Array Map of (job type => size)
+        */
+       public function getQueueSizes() {
+               $sizeMap = array();
+               foreach ( $this->getCoalescedQueues() as $info ) {
+                       $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
+                       if ( is_array( $sizes ) ) { // batching features supported
+                               $sizeMap = $sizeMap + $sizes;
+                       } else { // we have to go through the queues in the bucket one-by-one
+                               foreach ( $info['types'] as $type ) {
+                                       $sizeMap[$type] = $this->get( $type )->getSize();
+                               }
+                       }
+               }
+               return $sizeMap;
+       }
+
+       /**
+        * @return array
+        */
+       protected function getCoalescedQueues() {
+               global $wgJobTypeConf;
+
+               if ( $this->coalescedQueues === null ) {
+                       $this->coalescedQueues = array();
+                       foreach ( $wgJobTypeConf as $type => $conf ) {
+                               $queue = JobQueue::factory(
+                                       array( 'wiki' => $this->wiki, 'type' => 'null' ) + $conf );
+                               $loc = $queue->getCoalesceLocationInternal();
+                               if ( !isset( $this->coalescedQueues[$loc] ) ) {
+                                       $this->coalescedQueues[$loc]['queue'] = $queue;
+                                       $this->coalescedQueues[$loc]['types'] = array();
+                               }
+                               if ( $type === 'default' ) {
+                                       $this->coalescedQueues[$loc]['types'] = array_merge(
+                                               $this->coalescedQueues[$loc]['types'],
+                                               array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
+                                       );
+                               } else {
+                                       $this->coalescedQueues[$loc]['types'][] = $type;
+                               }
+                       }
+               }
+
+               return $this->coalescedQueues;
+       }
+
        /**
         * Check if jobs should not be popped of a queue right now.
         * This is only used for performance, such as to avoid spamming
index 57189a5..378e175 100644 (file)
@@ -501,7 +501,7 @@ LUA;
                        foreach ( $props as $prop ) {
                                $keys[] = $this->getQueueKey( $prop );
                        }
-                       $res = ( $conn->delete( $keys ) !== false );
+                       return ( $conn->delete( $keys ) !== false );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -547,6 +547,35 @@ LUA;
                }
        }
 
+       public function getCoalesceLocationInternal() {
+               return "RedisServer:" . $this->server;
+       }
+
+       protected function doGetSiblingQueuesWithJobs( array $types ) {
+               return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               $sizes = array(); // (type => size)
+               $types = array_values( $types ); // reindex
+               try {
+                       $conn = $this->getConnection();
+                       $conn->multi( Redis::PIPELINE );
+                       foreach ( $types as $type ) {
+                               $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
+                       }
+                       $res = $conn->exec();
+                       if ( is_array( $res ) ) {
+                               foreach ( $res as $i => $size ) {
+                                       $sizes[$types[$i]] = $size;
+                               }
+                       }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+               return $sizes;
+       }
+
        /**
         * This function should not be called outside JobQueueRedis
         *
@@ -804,14 +833,16 @@ LUA;
 
        /**
         * @param $prop string
+        * @param $type string|null
         * @return string
         */
-       private function getQueueKey( $prop ) {
+       private function getQueueKey( $prop, $type = null ) {
+               $type = is_string( $type ) ? $type : $this->type;
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
                if ( strlen( $this->key ) ) { // namespaced queue (for testing)
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
+                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
                } else {
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
+                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
                }
        }