jobqueue: avoid exceptions due to races in redis job listing functions
authorAaron Schulz <aschulz@wikimedia.org>
Mon, 8 Jul 2013 23:30:41 +0000 (16:30 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 9 Jul 2013 23:57:40 +0000 (16:57 -0700)
* Also improved MappedIterator to allow for an accept() closure.

Change-Id: I614dc6e98b5c6297f3efc7a0004fbabd19624915

includes/MappedIterator.php
includes/job/JobQueueRedis.php

index 14495f2..70d2032 100644 (file)
  *
  * @since 1.21
  */
-class MappedIterator implements Iterator {
-       /** @var Iterator */
-       protected $baseIterator;
+class MappedIterator extends FilterIterator {
        /** @var callable */
        protected $vCallback;
+       /** @var callable */
+       protected $aCallback;
+       /** @var array */
+       protected $cache = array();
+
+       protected $rewound = false; // boolean; whether rewind() has been called
 
        /**
         * Build an new iterator from a base iterator by having the former wrap the
@@ -38,59 +42,73 @@ class MappedIterator implements Iterator {
         * The callback takes the result of current() on the base iterator as an argument.
         * The keys of the base iterator are reused verbatim.
         *
+        * An "accept" callback can also be provided which will be called for each value in
+        * the base iterator (post-callback) and will return true if that value should be
+        * included in iteration of the MappedIterator (otherwise it will be filtered out).
+        *
         * @param Iterator|Array $iter
-        * @param callable $vCallback
+        * @param callable $vCallback Value transformation callback
+        * @param array $options Options map (includes "accept") (since 1.22)
         * @throws MWException
         */
-       public function __construct( $iter, $vCallback ) {
+       public function __construct( $iter, $vCallback, array $options = array() ) {
                if ( is_array( $iter ) ) {
-                       $this->baseIterator = new ArrayIterator( $iter );
+                       $baseIterator = new ArrayIterator( $iter );
                } elseif ( $iter instanceof Iterator ) {
-                       $this->baseIterator = $iter;
+                       $baseIterator = $iter;
                } else {
                        throw new MWException( "Invalid base iterator provided." );
                }
+               parent::__construct( $baseIterator );
                $this->vCallback = $vCallback;
+               $this->aCallback = isset( $options['accept'] ) ? $options['accept'] : null;
+       }
+
+       public function next() {
+               $this->cache = array();
+               parent::next();
        }
 
-       /**
-        * @return void
-        */
        public function rewind() {
-               $this->baseIterator->rewind();
+               $this->rewound = true;
+               $this->cache = array();
+               parent::rewind();
        }
 
-       /**
-        * @return Mixed|null Returns null if out of range
-        */
-       public function current() {
-               if ( !$this->baseIterator->valid() ) {
-                       return null; // out of range
+       public function accept() {
+               $value = call_user_func( $this->vCallback, $this->getInnerIterator()->current() );
+               $ok = ( $this->aCallback ) ? call_user_func( $this->aCallback, $value ) : true;
+               if ( $ok ) {
+                       $this->cache['current'] = $value;
                }
-               return call_user_func_array( $this->vCallback, array( $this->baseIterator->current() ) );
+               return $ok;
        }
 
-       /**
-        * @return Mixed|null Returns null if out of range
-        */
        public function key() {
-               if ( !$this->baseIterator->valid() ) {
-                       return null; // out of range
-               }
-               return $this->baseIterator->key();
+               $this->init();
+               return parent::key();
        }
 
-       /**
-        * @return void
-        */
-       public function next() {
-               $this->baseIterator->next();
+       public function valid() {
+               $this->init();
+               return parent::valid();
+       }
+
+       public function current() {
+               $this->init();
+               if ( parent::valid() ) {
+                       return $this->cache['current'];
+               } else {
+                       return null; // out of range
+               }
        }
 
        /**
-        * @return bool
+        * Obviate the usual need for rewind() before using a FilterIterator in a manual loop
         */
-       public function valid() {
-               return $this->baseIterator->valid();
+       protected function init() {
+               if ( !$this->rewound ) {
+                       $this->rewind();
+               }
        }
 }
index 1f5b761..939fa42 100644 (file)
@@ -519,7 +519,8 @@ LUA;
                                $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
                                function( $uid ) use ( $that, $conn ) {
                                        return $that->getJobFromUidInternal( $uid, $conn );
-                               }
+                               },
+                               array( 'accept' => function ( $job ) { return is_object( $job ); } )
                        );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -538,7 +539,8 @@ LUA;
                                $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
                                function( $uid ) use ( $that, $conn ) {
                                        return $that->getJobFromUidInternal( $uid, $conn );
-                               }
+                               },
+                               array( 'accept' => function ( $job ) { return is_object( $job ); } )
                        );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -550,11 +552,15 @@ LUA;
         *
         * @param $uid string
         * @param $conn RedisConnRef
-        * @return Job
+        * @return Job|bool Returns false if the job does not exist
         * @throws MWException
         */
        public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
                try {
+                       $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
+                       if ( $data === false ) {
+                               return false; // not found
+                       }
                        $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
                        if ( !is_array( $item ) ) { // this shouldn't happen
                                throw new MWException( "Could not find job with ID '$uid'." );