/** @var int Maximum number of times to try a job */
protected $maxTries;
- /** @var bool Allow delayed jobs */
- protected $checkDelay;
-
/** @var BagOStuff */
protected $dupCache;
+ /** @var JobQueueAggregator */
+ protected $aggr;
const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
if ( !in_array( $this->order, $this->supportedOrders() ) ) {
throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
}
- $this->checkDelay = !empty( $params['checkDelay'] );
- if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
- throw new MWException( __CLASS__ . " does not support delayed jobs." );
- }
$this->dupCache = wfGetCache( CACHE_ANYTHING );
+ $this->aggr = isset( $params['aggregator'] )
+ ? $params['aggregator']
+ : new JobQueueAggregatorNull( array() );
}
/**
* but not acknowledged as completed after this many seconds. Recycling
* of jobs simple means re-inserting them into the queue. Jobs can be
* attempted up to three times before being discarded.
- * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions.
- * This lets delayed jobs wait in a staging area until a given timestamp is
- * reached, at which point they will enter the queue. If this is not enabled
- * or not supported, an exception will be thrown on delayed job insertion.
*
* Queue classes should throw an exception if they do not support the options given.
*
return $this->order;
}
- /**
- * @return bool Whether delayed jobs are enabled
- * @since 1.22
- */
- final public function delayedJobsEnabled() {
- return $this->checkDelay;
- }
-
/**
* Get the allowed queue orders for configuration validation
*
return false; // not implemented
}
+ /**
+ * @return bool Whether delayed jobs are enabled
+ * @since 1.22
+ */
+ final public function delayedJobsEnabled() {
+ return $this->supportsDelayedJobs();
+ }
+
/**
* Quickly check if the queue has no available (unacquired, non-delayed) jobs.
* Queue classes should use caching if they are any slower without memcached.
* This does not require $wgJobClasses to be set for the given job type.
* Outside callers should use JobQueueGroup::push() instead of this function.
*
- * @param Job|array $jobs A single job or an array of Jobs
+ * @param JobSpecification|JobSpecification[] $jobs
* @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
* @return void
* @throws JobQueueError
*/
final public function push( $jobs, $flags = 0 ) {
- $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
+ $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ $this->batchPush( $jobs, $flags );
}
/**
* This does not require $wgJobClasses to be set for the given job type.
* Outside callers should use JobQueueGroup::push() instead of this function.
*
- * @param array $jobs List of Jobs
+ * @param JobSpecification[] $jobs
* @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
* @return void
* @throws MWException
if ( $job->getType() !== $this->type ) {
throw new MWException(
"Got '{$job->getType()}' job; expected a '{$this->type}' job." );
- } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
+ } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
throw new MWException(
"Got delayed '{$job->getType()}' job; delays are not supported." );
}
}
$this->doBatchPush( $jobs, $flags );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
+
+ foreach ( $jobs as $job ) {
+ if ( $job->isRootJob() ) {
+ $this->deduplicateRootJob( $job );
+ }
+ }
}
/**
* @see JobQueue::batchPush()
- * @param array $jobs
+ * @param JobSpecification[] $jobs
* @param int $flags
*/
abstract protected function doBatchPush( array $jobs, $flags );
$job = $this->doPop();
+ if ( !$job ) {
+ $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
+ }
+
// Flag this job as an old duplicate based on its "root" job...
try {
if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
- JobQueue::incrStats( 'job-pop-duplicate', $this->type, 1, $this->wiki );
+ JobQueue::incrStats( 'job-pop-duplicate', $this->type );
$job = DuplicateJob::newFromJob( $job ); // convert to a no-op
}
} catch ( Exception $e ) {
*
* This does nothing for certain queue classes.
*
- * @param Job $job
+ * @param IJobSpecification $job
* @throws MWException
* @return bool
*/
- final public function deduplicateRootJob( Job $job ) {
+ final public function deduplicateRootJob( IJobSpecification $job ) {
if ( $job->getType() !== $this->type ) {
throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
/**
* @see JobQueue::deduplicateRootJob()
- * @param Job $job
+ * @param IJobSpecification $job
* @throws MWException
* @return bool
*/
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
if ( !$job->hasRootJobParams() ) {
throw new MWException( "Cannot register root job; missing parameters." );
}
protected function doWaitForBackups() {
}
- /**
- * Return a map of task names to task definition maps.
- * A "task" is a fast periodic queue maintenance action.
- * Mutually exclusive tasks must implement their own locking in the callback.
- *
- * Each task value is an associative array with:
- * - name : the name of the task
- * - callback : a PHP callable that performs the task
- * - period : the period in seconds corresponding to the task frequency
- *
- * @return array
- */
- final public function getPeriodicTasks() {
- $tasks = $this->doGetPeriodicTasks();
- foreach ( $tasks as $name => &$def ) {
- $def['name'] = $name;
- }
-
- return $tasks;
- }
-
- /**
- * @see JobQueue::getPeriodicTasks()
- * @return array
- */
- protected function doGetPeriodicTasks() {
- return array();
- }
-
/**
* Clear any process and persistent caches
*
return new ArrayIterator( array() ); // not implemented
}
+ /**
+ * Get an iterator to traverse over all claimed jobs in this queue
+ *
+ * Callers should be quick to iterator over it or few results
+ * will be returned due to jobs being acknowledged and deleted
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.26
+ */
+ public function getAllAcquiredJobs() {
+ return new ArrayIterator( array() ); // not implemented
+ }
+
+ /**
+ * Get an iterator to traverse over all abandoned jobs in this queue
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.25
+ */
+ public function getAllAbandonedJobs() {
+ return new ArrayIterator( array() ); // not implemented
+ }
+
/**
* Do not use this function outside of JobQueue/JobQueueGroup
*
* @param string $key Event type
* @param string $type Job type
* @param int $delta
- * @param string $wiki Wiki ID (added in 1.23)
* @since 1.22
*/
- public static function incrStats( $key, $type, $delta = 1, $wiki = null ) {
+ public static function incrStats( $key, $type, $delta = 1 ) {
wfIncrStats( $key, $delta );
wfIncrStats( "{$key}-{$type}", $delta );
- if ( $wiki !== null ) {
- wfIncrStats( "{$key}-{$type}-{$wiki}", $delta );
- }
}
/**