Merge "Remove unused 'XMPGetInfo' and 'XMPGetResults' hooks"
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueRedis.php
index 82537f4..7edb6ad 100644 (file)
@@ -205,7 +205,7 @@ class JobQueueRedis extends JobQueue {
                        if ( $flags & self::QOS_ATOMIC ) {
                                $batches = array( $items ); // all or nothing
                        } else {
-                               $batches = array_chunk( $items, 500 ); // avoid tying up the server
+                               $batches = array_chunk( $items, 100 ); // avoid tying up the server
                        }
                        $failed = 0;
                        $pushed = 0;
@@ -222,9 +222,9 @@ class JobQueueRedis extends JobQueue {
 
                                throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." );
                        }
-                       JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki );
+                       JobQueue::incrStats( 'job-insert', $this->type, count( $items ) );
                        JobQueue::incrStats( 'job-insert-duplicate', $this->type,
-                               count( $items ) - $failed - $pushed, $this->wiki );
+                               count( $items ) - $failed - $pushed );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
@@ -300,7 +300,7 @@ LUA;
                                        break; // no jobs; nothing to do
                                }
 
-                               JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+                               JobQueue::incrStats( 'job-pop', $this->type );
                                $item = $this->unserialize( $blob );
                                if ( $item === false ) {
                                        wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
@@ -389,6 +389,8 @@ LUA;
 
                                return false;
                        }
+
+                       JobQueue::incrStats( 'job-ack', $this->type );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
@@ -473,70 +475,84 @@ LUA;
        /**
         * @see JobQueue::getAllQueuedJobs()
         * @return Iterator
+        * @throws JobQueueError
         */
        public function getAllQueuedJobs() {
                $conn = $this->getConnection();
                try {
-                       $that = $this;
-
-                       return new MappedIterator(
-                               $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
-                               function ( $uid ) use ( $that, $conn ) {
-                                       return $that->getJobFromUidInternal( $uid, $conn );
-                               },
-                               array( 'accept' => function ( $job ) {
-                                       return is_object( $job );
-                               } )
-                       );
+                       $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
+
+               return $this->getJobIterator( $conn, $uids );
        }
 
        /**
-        * @see JobQueue::getAllQueuedJobs()
+        * @see JobQueue::getAllDelayedJobs()
         * @return Iterator
+        * @throws JobQueueError
         */
        public function getAllDelayedJobs() {
                $conn = $this->getConnection();
                try {
-                       $that = $this;
-
-                       return new MappedIterator( // delayed jobs
-                               $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
-                               function ( $uid ) use ( $that, $conn ) {
-                                       return $that->getJobFromUidInternal( $uid, $conn );
-                               },
-                               array( 'accept' => function ( $job ) {
-                                       return is_object( $job );
-                               } )
-                       );
+                       $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $conn, $e );
+               }
+
+               return $this->getJobIterator( $conn, $uids );
+       }
+
+       /**
+        * @see JobQueue::getAllAcquiredJobs()
+        * @return Iterator
+        * @throws JobQueueError
+        */
+       public function getAllAcquiredJobs() {
+               $conn = $this->getConnection();
+               try {
+                       $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
+
+               return $this->getJobIterator( $conn, $uids );
        }
 
        /**
         * @see JobQueue::getAllAbandonedJobs()
         * @return Iterator
+        * @throws JobQueueError
         */
        public function getAllAbandonedJobs() {
                $conn = $this->getConnection();
                try {
-                       $that = $this;
-
-                       return new MappedIterator( // delayed jobs
-                               $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ),
-                               function ( $uid ) use ( $that, $conn ) {
-                                       return $that->getJobFromUidInternal( $uid, $conn );
-                               },
-                               array( 'accept' => function ( $job ) {
-                                       return is_object( $job );
-                               } )
-                       );
+                       $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
+
+               return $this->getJobIterator( $conn, $uids );
+       }
+
+       /**
+        * @param RedisConnRef $conn
+        * @param array $uids List of job UUIDs
+        * @return MappedIterator
+        */
+       protected function getJobIterator( RedisConnRef $conn, array $uids ) {
+               $that = $this;
+
+               return new MappedIterator(
+                       $uids,
+                       function ( $uid ) use ( $that, $conn ) {
+                               return $that->getJobFromUidInternal( $uid, $conn );
+                       },
+                       array( 'accept' => function ( $job ) {
+                               return is_object( $job );
+                       } )
+               );
        }
 
        public function getCoalesceLocationInternal() {
@@ -583,7 +599,7 @@ LUA;
                        if ( $data === false ) {
                                return false; // not found
                        }
-                       $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
+                       $item = $this->unserialize( $data );
                        if ( !is_array( $item ) ) { // this shouldn't happen
                                throw new MWException( "Could not find job with ID '$uid'." );
                        }
@@ -597,13 +613,6 @@ LUA;
                }
        }
 
-       /**
-        * @return array
-        */
-       protected function doGetPeriodicTasks() {
-               return array(); // managed in the runner loop
-       }
-
        /**
         * @param IJobSpecification $job
         * @return array