/** @var int Maximum number of times to try a job */
protected $maxTries;
- /** @var bool Allow delayed jobs */
- protected $checkDelay;
-
/** @var BagOStuff */
protected $dupCache;
+ /** @var JobQueueAggregator */
+ protected $aggr;
const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
if ( !in_array( $this->order, $this->supportedOrders() ) ) {
throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
}
- $this->checkDelay = !empty( $params['checkDelay'] );
- if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
- throw new MWException( __CLASS__ . " does not support delayed jobs." );
- }
$this->dupCache = wfGetCache( CACHE_ANYTHING );
+ $this->aggr = isset( $params['aggregator'] )
+ ? $params['aggregator']
+ : new JobQueueAggregatorNull( array() );
}
/**
* but not acknowledged as completed after this many seconds. Recycling
* of jobs simple means re-inserting them into the queue. Jobs can be
* attempted up to three times before being discarded.
- * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions.
- * This lets delayed jobs wait in a staging area until a given timestamp is
- * reached, at which point they will enter the queue. If this is not enabled
- * or not supported, an exception will be thrown on delayed job insertion.
*
* Queue classes should throw an exception if they do not support the options given.
*
return $this->order;
}
- /**
- * @return bool Whether delayed jobs are enabled
- * @since 1.22
- */
- final public function delayedJobsEnabled() {
- return $this->checkDelay;
- }
-
/**
* Get the allowed queue orders for configuration validation
*
return false; // not implemented
}
+ /**
+ * @return bool Whether delayed jobs are enabled
+ * @since 1.22
+ */
+ final public function delayedJobsEnabled() {
+ return $this->supportsDelayedJobs();
+ }
+
/**
* Quickly check if the queue has no available (unacquired, non-delayed) jobs.
* Queue classes should use caching if they are any slower without memcached.
* @throws JobQueueError
*/
final public function push( $jobs, $flags = 0 ) {
- $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ $this->batchPush( $jobs, $flags );
}
/**
if ( $job->getType() !== $this->type ) {
throw new MWException(
"Got '{$job->getType()}' job; expected a '{$this->type}' job." );
- } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
+ } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
throw new MWException(
"Got delayed '{$job->getType()}' job; delays are not supported." );
}
}
$this->doBatchPush( $jobs, $flags );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
/**
$job = $this->doPop();
+ if ( !$job ) {
+ $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
+ }
+
// Flag this job as an old duplicate based on its "root" job...
try {
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
- JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki );
+ JobQueue::incrStats( 'job-pop-duplicate', $this->type );
$job = DuplicateJob::newFromJob( $job ); // convert to a no-op
}
} catch ( Exception $e ) {
return new ArrayIterator( array() ); // not implemented
}
+ /**
+ * Get an iterator to traverse over all claimed jobs in this queue
+ *
+ * Callers should be quick to iterator over it or few results
+ * will be returned due to jobs being acknowledged and deleted
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.26
+ */
+ public function getAllAcquiredJobs() {
+ return new ArrayIterator( array() ); // not implemented
+ }
+
+ /**
+ * Get an iterator to traverse over all abandoned jobs in this queue
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.25
+ */
+ public function getAllAbandonedJobs() {
+ return new ArrayIterator( array() ); // not implemented
+ }
+
/**
* Do not use this function outside of JobQueue/JobQueueGroup
*
* @param string $key Event type
* @param string $type Job type
* @param int $delta
- * @param string $wiki Wiki ID (added in 1.23)
* @since 1.22
*/
- public static function incrStats( $key, $type, $delta = 1, $wiki = null ) {
+ public static function incrStats( $key, $type, $delta = 1 ) {
wfIncrStats( $key, $delta );
wfIncrStats( "{$key}-{$type}", $delta );
- if ( $wiki !== null ) {
- wfIncrStats( "{$key}-{$type}-{$wiki}", $delta );
- }
}
/**