$conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
function( $uid ) use ( $that, $conn ) {
return $that->getJobFromUidInternal( $uid, $conn );
- }
+ },
+ array( 'accept' => function ( $job ) { return is_object( $job ); } )
);
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
$conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
function( $uid ) use ( $that, $conn ) {
return $that->getJobFromUidInternal( $uid, $conn );
- }
+ },
+ array( 'accept' => function ( $job ) { return is_object( $job ); } )
);
} catch ( RedisException $e ) {
$this->throwRedisException( $this->server, $conn, $e );
*
* @param $uid string
* @param $conn RedisConnRef
- * @return Job
+ * @return Job|bool Returns false if the job does not exist
* @throws MWException
*/
public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
try {
+ $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
+ if ( $data === false ) {
+ return false; // not found
+ }
$item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
if ( !is_array( $item ) ) { // this shouldn't happen
throw new MWException( "Could not find job with ID '$uid'." );
protected function getConnection() {
$conn = $this->redisPool->getConnection( $this->server );
if ( !$conn ) {
- throw new MWException( "Unable to connect to redis server." );
+ throw new JobQueueConnectionError( "Unable to connect to redis server." );
}
return $conn;
}
*/
protected function throwRedisException( $server, RedisConnRef $conn, $e ) {
$this->redisPool->handleException( $server, $conn, $e );
- throw new MWException( "Redis server error: {$e->getMessage()}\n" );
+ throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
}
/**