From 4ace725b4e12090d4bd5d18059d942e0ce97a063 Mon Sep 17 00:00:00 2001 From: Aaron Schulz Date: Mon, 8 Jul 2013 16:30:41 -0700 Subject: [PATCH] jobqueue: avoid exceptions due to races in redis job listing functions * Also improved MappedIterator to allow for an accept() closure. Change-Id: I614dc6e98b5c6297f3efc7a0004fbabd19624915 --- includes/MappedIterator.php | 84 +++++++++++++++++++++------------- includes/job/JobQueueRedis.php | 12 +++-- 2 files changed, 60 insertions(+), 36 deletions(-) diff --git a/includes/MappedIterator.php b/includes/MappedIterator.php index 14495f2f06..70d20327df 100644 --- a/includes/MappedIterator.php +++ b/includes/MappedIterator.php @@ -26,11 +26,15 @@ * * @since 1.21 */ -class MappedIterator implements Iterator { - /** @var Iterator */ - protected $baseIterator; +class MappedIterator extends FilterIterator { /** @var callable */ protected $vCallback; + /** @var callable */ + protected $aCallback; + /** @var array */ + protected $cache = array(); + + protected $rewound = false; // boolean; whether rewind() has been called /** * Build an new iterator from a base iterator by having the former wrap the @@ -38,59 +42,73 @@ class MappedIterator implements Iterator { * The callback takes the result of current() on the base iterator as an argument. * The keys of the base iterator are reused verbatim. * + * An "accept" callback can also be provided which will be called for each value in + * the base iterator (post-callback) and will return true if that value should be + * included in iteration of the MappedIterator (otherwise it will be filtered out). + * * @param Iterator|Array $iter - * @param callable $vCallback + * @param callable $vCallback Value transformation callback + * @param array $options Options map (includes "accept") (since 1.22) * @throws MWException */ - public function __construct( $iter, $vCallback ) { + public function __construct( $iter, $vCallback, array $options = array() ) { if ( is_array( $iter ) ) { - $this->baseIterator = new ArrayIterator( $iter ); + $baseIterator = new ArrayIterator( $iter ); } elseif ( $iter instanceof Iterator ) { - $this->baseIterator = $iter; + $baseIterator = $iter; } else { throw new MWException( "Invalid base iterator provided." ); } + parent::__construct( $baseIterator ); $this->vCallback = $vCallback; + $this->aCallback = isset( $options['accept'] ) ? $options['accept'] : null; + } + + public function next() { + $this->cache = array(); + parent::next(); } - /** - * @return void - */ public function rewind() { - $this->baseIterator->rewind(); + $this->rewound = true; + $this->cache = array(); + parent::rewind(); } - /** - * @return Mixed|null Returns null if out of range - */ - public function current() { - if ( !$this->baseIterator->valid() ) { - return null; // out of range + public function accept() { + $value = call_user_func( $this->vCallback, $this->getInnerIterator()->current() ); + $ok = ( $this->aCallback ) ? call_user_func( $this->aCallback, $value ) : true; + if ( $ok ) { + $this->cache['current'] = $value; } - return call_user_func_array( $this->vCallback, array( $this->baseIterator->current() ) ); + return $ok; } - /** - * @return Mixed|null Returns null if out of range - */ public function key() { - if ( !$this->baseIterator->valid() ) { - return null; // out of range - } - return $this->baseIterator->key(); + $this->init(); + return parent::key(); } - /** - * @return void - */ - public function next() { - $this->baseIterator->next(); + public function valid() { + $this->init(); + return parent::valid(); + } + + public function current() { + $this->init(); + if ( parent::valid() ) { + return $this->cache['current']; + } else { + return null; // out of range + } } /** - * @return bool + * Obviate the usual need for rewind() before using a FilterIterator in a manual loop */ - public function valid() { - return $this->baseIterator->valid(); + protected function init() { + if ( !$this->rewound ) { + $this->rewind(); + } } } diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 1f5b7619bb..939fa42c17 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -519,7 +519,8 @@ LUA; $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), function( $uid ) use ( $that, $conn ) { return $that->getJobFromUidInternal( $uid, $conn ); - } + }, + array( 'accept' => function ( $job ) { return is_object( $job ); } ) ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); @@ -538,7 +539,8 @@ LUA; $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), function( $uid ) use ( $that, $conn ) { return $that->getJobFromUidInternal( $uid, $conn ); - } + }, + array( 'accept' => function ( $job ) { return is_object( $job ); } ) ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); @@ -550,11 +552,15 @@ LUA; * * @param $uid string * @param $conn RedisConnRef - * @return Job + * @return Job|bool Returns false if the job does not exist * @throws MWException */ public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { try { + $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); + if ( $data === false ) { + return false; // not found + } $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) ); if ( !is_array( $item ) ) { // this shouldn't happen throw new MWException( "Could not find job with ID '$uid'." ); -- 2.20.1