if ( $flags & self::QOS_ATOMIC ) {
$batches = array( $items ); // all or nothing
} else {
- $batches = array_chunk( $items, 500 ); // avoid tying up the server
+ $batches = array_chunk( $items, 100 ); // avoid tying up the server
}
$failed = 0;
$pushed = 0;
throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." );
}
- JobQueue::incrStats( 'job-insert', $this->type, count( $items ), $this->wiki );
+ JobQueue::incrStats( 'job-insert', $this->type, count( $items ) );
JobQueue::incrStats( 'job-insert-duplicate', $this->type,
- count( $items ) - $failed - $pushed, $this->wiki );
+ count( $items ) - $failed - $pushed );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
break; // no jobs; nothing to do
}
- JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+ JobQueue::incrStats( 'job-pop', $this->type );
$item = $this->unserialize( $blob );
if ( $item === false ) {
wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
return false;
}
+
+ JobQueue::incrStats( 'job-ack', $this->type );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
/**
* @see JobQueue::getAllQueuedJobs()
* @return Iterator
+ * @throws JobQueueError
*/
public function getAllQueuedJobs() {
$conn = $this->getConnection();
try {
- $that = $this;
-
- return new MappedIterator(
- $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
- function ( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) {
- return is_object( $job );
- } )
- );
+ $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
+
+ return $this->getJobIterator( $conn, $uids );
}
/**
- * @see JobQueue::getAllQueuedJobs()
+ * @see JobQueue::getAllDelayedJobs()
* @return Iterator
+ * @throws JobQueueError
*/
public function getAllDelayedJobs() {
$conn = $this->getConnection();
try {
- $that = $this;
-
- return new MappedIterator( // delayed jobs
- $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
- function ( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) {
- return is_object( $job );
- } )
- );
+ $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $this->getJobIterator( $conn, $uids );
+ }
+
+ /**
+ * @see JobQueue::getAllAcquiredJobs()
+ * @return Iterator
+ * @throws JobQueueError
+ */
+ public function getAllAcquiredJobs() {
+ $conn = $this->getConnection();
+ try {
+ $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
+
+ return $this->getJobIterator( $conn, $uids );
}
/**
* @see JobQueue::getAllAbandonedJobs()
* @return Iterator
+ * @throws JobQueueError
*/
public function getAllAbandonedJobs() {
$conn = $this->getConnection();
try {
- $that = $this;
-
- return new MappedIterator( // delayed jobs
- $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ),
- function ( $uid ) use ( $that, $conn ) {
- return $that->getJobFromUidInternal( $uid, $conn );
- },
- array( 'accept' => function ( $job ) {
- return is_object( $job );
- } )
- );
+ $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
+
+ return $this->getJobIterator( $conn, $uids );
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @param array $uids List of job UUIDs
+ * @return MappedIterator
+ */
+ protected function getJobIterator( RedisConnRef $conn, array $uids ) {
+ $that = $this;
+
+ return new MappedIterator(
+ $uids,
+ function ( $uid ) use ( $that, $conn ) {
+ return $that->getJobFromUidInternal( $uid, $conn );
+ },
+ array( 'accept' => function ( $job ) {
+ return is_object( $job );
+ } )
+ );
}
public function getCoalesceLocationInternal() {
if ( $data === false ) {
return false; // not found
}
- $item = $this->unserialize( $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ) );
+ $item = $this->unserialize( $data );
if ( !is_array( $item ) ) { // this shouldn't happen
throw new MWException( "Could not find job with ID '$uid'." );
}
}
}
- /**
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array(); // managed in the runner loop
- }
-
/**
* @param IJobSpecification $job
* @return array