protected $order; // string; job priority for pop()
protected $claimTTL; // integer; seconds
protected $maxTries; // integer; maximum number of times to try a job
+ protected $checkDelay; // boolean; allow delayed jobs
- const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
+ const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
+ const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions (b/c)
const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
if ( !in_array( $this->order, $this->supportedOrders() ) ) {
throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
}
+ $this->checkDelay = !empty( $params['checkDelay'] );
+ if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
+ throw new MWException( __CLASS__ . " does not support delayed jobs." );
+ }
}
/**
* Get a job queue object of the specified type.
* $params includes:
- * - class : What job class to use (determines job type)
- * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
- * - type : The name of the job types this queue handles
- * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
- * If "fifo" is used, the queue will effectively be FIFO. Note that
- * job completion will not appear to be exactly FIFO if there are multiple
- * job runners since jobs can take different times to finish once popped.
- * If "timestamp" is used, the queue will at least be loosely ordered
- * by timestamp, allowing for some jobs to be popped off out of order.
- * If "random" is used, pop() will pick jobs in random order.
- * Note that it may only be weakly random (e.g. a lottery of the oldest X).
- * If "any" is choosen, the queue will use whatever order is the fastest.
- * This might be useful for improving concurrency for job acquisition.
- * - claimTTL : If supported, the queue will recycle jobs that have been popped
- * but not acknowledged as completed after this many seconds. Recycling
- * of jobs simple means re-inserting them into the queue. Jobs can be
- * attempted up to three times before being discarded.
+ * - class : What job class to use (determines job type)
+ * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
+ * - type : The name of the job types this queue handles
+ * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
+ * If "fifo" is used, the queue will effectively be FIFO. Note that job
+ * completion will not appear to be exactly FIFO if there are multiple
+ * job runners since jobs can take different times to finish once popped.
+ * If "timestamp" is used, the queue will at least be loosely ordered
+ * by timestamp, allowing for some jobs to be popped off out of order.
+ * If "random" is used, pop() will pick jobs in random order.
+ * Note that it may only be weakly random (e.g. a lottery of the oldest X).
+ * If "any" is choosen, the queue will use whatever order is the fastest.
+ * This might be useful for improving concurrency for job acquisition.
+ * - claimTTL : If supported, the queue will recycle jobs that have been popped
+ * but not acknowledged as completed after this many seconds. Recycling
+ * of jobs simple means re-inserting them into the queue. Jobs can be
+ * attempted up to three times before being discarded.
+ * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions.
+ * This lets delayed jobs wait in a staging area until a given timestamp is
+ * reached, at which point they will enter the queue. If this is not enabled
+ * or not supported, an exception will be thrown on delayed job insertion.
*
* Queue classes should throw an exception if they do not support the options given.
*
abstract protected function optimalOrder();
/**
- * Quickly check if the queue is empty (has no available jobs).
+ * @return boolean Whether delayed jobs are supported
+ */
+ protected function supportsDelayedJobs() {
+ return false; // not implemented
+ }
+
+ /**
+ * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
* Queue classes should use caching if they are any slower without memcached.
*
* If caching is used, this might return false when there are actually no jobs.
abstract protected function doIsEmpty();
/**
- * Get the number of available (unacquired) jobs in the queue.
+ * Get the number of available (unacquired, non-delayed) jobs in the queue.
* Queue classes should use caching if they are any slower without memcached.
*
* If caching is used, this number might be out of date for a minute.
*/
abstract protected function doGetAcquiredCount();
+ /**
+ * Get the number of delayed jobs (these are temporarily out of the queue).
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return integer
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function getDelayedCount() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetDelayedCount();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getDelayedCount()
+ * @return integer
+ */
+ protected function doGetDelayedCount() {
+ return 0; // not implemented
+ }
+
/**
* Push a single jobs into the queue.
* This does not require $wgJobClasses to be set for the given job type.
* Outside callers should use JobQueueGroup::push() instead of this function.
*
* @param $jobs Job|Array
- * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
+ * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
* @return bool Returns false on failure
* @throws MWException
*/
* Outside callers should use JobQueueGroup::push() instead of this function.
*
* @param array $jobs List of Jobs
- * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
+ * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
* @return bool Returns false on failure
* @throws MWException
*/
foreach ( $jobs as $job ) {
if ( $job->getType() !== $this->type ) {
- throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ throw new MWException(
+ "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
+ } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
+ throw new MWException(
+ "Got delayed '{$job->getType()}' job; delays are not supported." );
}
}
protected function doFlushCaches() {}
/**
- * Get an iterator to traverse over all of the jobs in this queue.
- * This does not include jobs that are current acquired. In general,
- * this should only be called on a queue that is no longer being popped.
+ * Get an iterator to traverse over all available jobs in this queue.
+ * This does not include jobs that are currently acquired or delayed.
+ * This should only be called on a queue that is no longer being popped.
*
* @return Iterator|Traversable|Array
* @throws MWException
*/
abstract public function getAllQueuedJobs();
+ /**
+ * Get an iterator to traverse over all delayed jobs in this queue.
+ * This should only be called on a queue that is no longer being popped.
+ *
+ * @return Iterator|Traversable|Array
+ * @throws MWException
+ * @since 1.22
+ */
+ public function getAllDelayedJobs() {
+ return array(); // not implemented
+ }
+
/**
* Namespace the queue with a key to isolate it for testing
*