$failed += count( $itemBatch );
}
}
+ JobQueue::incrStats( 'inserts', $this->type, count( $items ) );
+ JobQueue::incrStats( 'inserts_actual', $this->type, $pushed );
+ JobQueue::incrStats( 'dupe_inserts', $this->type,
+ count( $items ) - $failed - $pushed );
if ( $failed > 0 ) {
- wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
-
- throw new RedisException( "Could not insert {$failed} {$this->type} job(s)." );
+ $err = "Could not insert {$failed} {$this->type} job(s).";
+ wfDebugLog( 'JobQueueRedis', $err );
+ throw new RedisException( $err );
}
- JobQueue::incrStats( 'job-insert', $this->type, count( $items ) );
- JobQueue::incrStats( 'job-insert-duplicate', $this->type,
- count( $items ) - $failed - $pushed );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
break; // no jobs; nothing to do
}
- JobQueue::incrStats( 'job-pop', $this->type );
+ JobQueue::incrStats( 'pops', $this->type );
$item = $this->unserialize( $blob );
if ( $item === false ) {
wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." );
}
+ $uuid = $job->metadata['uuid'];
$conn = $this->getConnection();
try {
static $script =
$this->getQueueKey( 'z-claimed' ), # KEYS[1]
$this->getQueueKey( 'h-attempts' ), # KEYS[2]
$this->getQueueKey( 'h-data' ), # KEYS[3]
- $job->metadata['uuid'] # ARGV[1]
+ $uuid # ARGV[1]
),
3 # number of first argument(s) that are keys
);
if ( !$res ) {
- wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+ wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." );
return false;
}
- JobQueue::incrStats( 'job-ack', $this->type );
+ JobQueue::incrStats( 'acks', $this->type );
} catch ( RedisException $e ) {
$this->throwRedisException( $conn, $e );
}
/**
* @see JobQueue::doDeduplicateRootJob()
- * @param Job $job
+ * @param IJobSpecification $job
* @return bool
* @throws JobQueueError
* @throws LogicException
*/
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
if ( !$job->hasRootJobParams() ) {
throw new LogicException( "Cannot register root job; missing parameters." );
}
// Get the last time this root job was enqueued
$timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
} catch ( RedisException $e ) {
+ $timestamp = false;
$this->throwRedisException( $conn, $e );
}
$title = Title::makeTitle( $item['namespace'], $item['title'] );
$job = Job::factory( $item['type'], $title, $item['params'] );
$job->metadata['uuid'] = $item['uuid'];
+ $job->metadata['timestamp'] = $item['timestamp'];
+ // Add in attempt count for debugging at showJobs.php
+ $job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid );
return $job;
} catch ( RedisException $e ) {
$title = Title::makeTitle( $fields['namespace'], $fields['title'] );
$job = Job::factory( $fields['type'], $title, $fields['params'] );
$job->metadata['uuid'] = $fields['uuid'];
+ $job->metadata['timestamp'] = $fields['timestamp'];
return $job;
}