Made showJobs.php show claimed jobs too
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 17 Apr 2015 18:08:26 +0000 (11:08 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Fri, 17 Apr 2015 19:35:39 +0000 (12:35 -0700)
Change-Id: Ifc5eeff10fdcdbbe4ade7b32e5388fee4eb94bcf

includes/jobqueue/JobQueue.php
includes/jobqueue/JobQueueFederated.php
includes/jobqueue/JobQueueRedis.php
maintenance/showJobs.php

index 91fe86c..ef718e0 100644 (file)
@@ -615,6 +615,20 @@ abstract class JobQueue {
                return new ArrayIterator( array() ); // not implemented
        }
 
+       /**
+        * Get an iterator to traverse over all claimed jobs in this queue
+        *
+        * Callers should be quick to iterator over it or few results
+        * will be returned due to jobs being acknowledged and deleted
+        *
+        * @return Iterator
+        * @throws JobQueueError
+        * @since 1.26
+        */
+       public function getAllAcquiredJobs() {
+               return new ArrayIterator( array() ); // not implemented
+       }
+
        /**
         * Get an iterator to traverse over all abandoned jobs in this queue
         *
index d985d44..178ce8a 100644 (file)
@@ -422,6 +422,17 @@ class JobQueueFederated extends JobQueue {
                return $iterator;
        }
 
+       public function getAllAcquiredJobs() {
+               $iterator = new AppendIterator();
+
+               /** @var JobQueue $queue */
+               foreach ( $this->partitionQueues as $queue ) {
+                       $iterator->append( $queue->getAllAcquiredJobs() );
+               }
+
+               return $iterator;
+       }
+
        public function getAllAbandonedJobs() {
                $iterator = new AppendIterator();
 
index 82537f4..0f87df7 100644 (file)
@@ -473,70 +473,84 @@ LUA;
        /**
         * @see JobQueue::getAllQueuedJobs()
         * @return Iterator
+        * @throws JobQueueError
         */
        public function getAllQueuedJobs() {
                $conn = $this->getConnection();
                try {
-                       $that = $this;
-
-                       return new MappedIterator(
-                               $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
-                               function ( $uid ) use ( $that, $conn ) {
-                                       return $that->getJobFromUidInternal( $uid, $conn );
-                               },
-                               array( 'accept' => function ( $job ) {
-                                       return is_object( $job );
-                               } )
-                       );
+                       $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
+
+               return $this->getJobIterator( $conn, $uids );
        }
 
        /**
-        * @see JobQueue::getAllQueuedJobs()
+        * @see JobQueue::getAllDelayedJobs()
         * @return Iterator
+        * @throws JobQueueError
         */
        public function getAllDelayedJobs() {
                $conn = $this->getConnection();
                try {
-                       $that = $this;
-
-                       return new MappedIterator( // delayed jobs
-                               $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
-                               function ( $uid ) use ( $that, $conn ) {
-                                       return $that->getJobFromUidInternal( $uid, $conn );
-                               },
-                               array( 'accept' => function ( $job ) {
-                                       return is_object( $job );
-                               } )
-                       );
+                       $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $conn, $e );
+               }
+
+               return $this->getJobIterator( $conn, $uids );
+       }
+
+       /**
+        * @see JobQueue::getAllAcquiredJobs()
+        * @return Iterator
+        * @throws JobQueueError
+        */
+       public function getAllAcquiredJobs() {
+               $conn = $this->getConnection();
+               try {
+                       $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
+
+               return $this->getJobIterator( $conn, $uids );
        }
 
        /**
         * @see JobQueue::getAllAbandonedJobs()
         * @return Iterator
+        * @throws JobQueueError
         */
        public function getAllAbandonedJobs() {
                $conn = $this->getConnection();
                try {
-                       $that = $this;
-
-                       return new MappedIterator( // delayed jobs
-                               $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ),
-                               function ( $uid ) use ( $that, $conn ) {
-                                       return $that->getJobFromUidInternal( $uid, $conn );
-                               },
-                               array( 'accept' => function ( $job ) {
-                                       return is_object( $job );
-                               } )
-                       );
+                       $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
+
+               return $this->getJobIterator( $conn, $uids );
+       }
+
+       /**
+        * @param RedisConnRef $conn
+        * @param array $uids List of job UUIDs
+        * @return MappedIterator
+        */
+       protected function getJobIterator( RedisConnRef $conn, array $uids ) {
+               $that = $this;
+
+               return new MappedIterator(
+                       $uids,
+                       function ( $uid ) use ( $that, $conn ) {
+                               return $that->getJobFromUidInternal( $uid, $conn );
+                       },
+                       array( 'accept' => function ( $job ) {
+                               return is_object( $job );
+                       } )
+               );
        }
 
        public function getCoalesceLocationInternal() {
@@ -583,7 +597,7 @@ LUA;
                        if ( $data === false ) {
                                return false; // not found
                        }
-                       $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
+                       $item = $this->unserialize( $data );
                        if ( !is_array( $item ) ) { // this shouldn't happen
                                throw new MWException( "Could not find job with ID '$uid'." );
                        }
index 9e9ad32..a989aef 100644 (file)
@@ -58,6 +58,9 @@ class ShowJobs extends Maintenance {
                                foreach ( $queue->getAllDelayedJobs() as $job ) {
                                        $this->output( $job->toString() . " status=delayed\n" );
                                }
+                               foreach ( $queue->getAllAcquiredJobs() as $job ) {
+                                       $this->output( $job->toString() . " status=claimed\n" );
+                               }
                                foreach ( $queue->getAllAbandonedJobs() as $job ) {
                                        $this->output( $job->toString() . " status=abandoned\n" );
                                }