foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
$dbw->insert( 'job', $rowBatch, $method );
}
- JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
- JobQueue::incrStats(
- 'job-insert-duplicate',
- $this->type,
+ JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
+ JobQueue::incrStats( 'dupe_inserts', $this->type,
count( $rowSet ) + count( $rowList ) - count( $rows )
);
} catch ( DBError $e ) {
if ( !$row ) {
break; // nothing to do
}
- JobQueue::incrStats( 'job-pop', $this->type );
+ JobQueue::incrStats( 'pops', $this->type );
// Get the job object from the row...
$title = Title::makeTitle( $row->job_namespace, $row->job_title );
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
$job->metadata['id'] = $row->job_id;
+ $job->metadata['timestamp'] = $row->job_timestamp;
break; // done
} while ( true );
$dbw->delete( 'job',
array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
- JobQueue::incrStats( 'job-ack', $this->type );
+ JobQueue::incrStats( 'acks', $this->type );
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
/**
* @see JobQueue::doDeduplicateRootJob()
- * @param Job $job
+ * @param IJobSpecification $job
* @throws MWException
* @return bool
*/
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
$params = $job->getParams();
if ( !isset( $params['rootJobSignature'] ) ) {
throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
* @return Iterator
*/
public function getAllQueuedJobs() {
+ return $this->getJobIterator( array( 'job_cmd' => $this->getType(), 'job_token' => '' ) );
+ }
+
+ /**
+ * @see JobQueue::getAllAcquiredJobs()
+ * @return Iterator
+ */
+ public function getAllAcquiredJobs() {
+ return $this->getJobIterator( array( 'job_cmd' => $this->getType(), "job_token > ''" ) );
+ }
+
+ /**
+ * @param array $conds Query conditions
+ * @return Iterator
+ */
+ protected function getJobIterator( array $conds ) {
$dbr = $this->getSlaveDB();
try {
return new MappedIterator(
- $dbr->select( 'job', self::selectFields(),
- array( 'job_cmd' => $this->getType(), 'job_token' => '' ) ),
- function ( $row ) use ( $dbr ) {
+ $dbr->select( 'job', self::selectFields(), $conds ),
+ function ( $row ) {
$job = Job::factory(
$row->job_cmd,
Title::makeTitle( $row->job_namespace, $row->job_title ),
- strlen( $row->job_params ) ? unserialize( $row->job_params ) : false
+ strlen( $row->job_params ) ? unserialize( $row->job_params ) : array()
);
$job->metadata['id'] = $row->job_id;
+ $job->metadata['timestamp'] = $row->job_timestamp;
+
return $job;
}
);
);
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'job-recycle', $this->type, $affected );
+ JobQueue::incrStats( 'recycles', $this->type, $affected );
$this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
}
$dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'job-abandon', $this->type, $affected );
+ JobQueue::incrStats( 'abandons', $this->type, $affected );
}
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );