protected $order; // string; job priority for pop()
protected $claimTTL; // integer; seconds
protected $maxTries; // integer; maximum number of times to try a job
+ protected $checkDelay; // boolean; allow delayed jobs
- const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
+ const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
+ const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions (b/c)
+
+ const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
/**
* @param $params array
if ( !in_array( $this->order, $this->supportedOrders() ) ) {
throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
}
+ $this->checkDelay = !empty( $params['checkDelay'] );
+ if ( $this->checkDelay && !$this->supportsDelayedJobs() ) {
+ throw new MWException( __CLASS__ . " does not support delayed jobs." );
+ }
}
/**
* Get a job queue object of the specified type.
* $params includes:
- * - class : What job class to use (determines job type)
- * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
- * - type : The name of the job types this queue handles
- * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
- * If "fifo" is used, the queue will effectively be FIFO. Note that
- * job completion will not appear to be exactly FIFO if there are multiple
- * job runners since jobs can take different times to finish once popped.
- * If "timestamp" is used, the queue will at least be loosely ordered
- * by timestamp, allowing for some jobs to be popped off out of order.
- * If "random" is used, pop() will pick jobs in random order.
- * Note that it may only be weakly random (e.g. a lottery of the oldest X).
- * If "any" is choosen, the queue will use whatever order is the fastest.
- * This might be useful for improving concurrency for job acquisition.
- * - claimTTL : If supported, the queue will recycle jobs that have been popped
- * but not acknowledged as completed after this many seconds. Recycling
- * of jobs simple means re-inserting them into the queue. Jobs can be
- * attempted up to three times before being discarded.
+ * - class : What job class to use (determines job type)
+ * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
+ * - type : The name of the job types this queue handles
+ * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
+ * If "fifo" is used, the queue will effectively be FIFO. Note that job
+ * completion will not appear to be exactly FIFO if there are multiple
+ * job runners since jobs can take different times to finish once popped.
+ * If "timestamp" is used, the queue will at least be loosely ordered
+ * by timestamp, allowing for some jobs to be popped off out of order.
+ * If "random" is used, pop() will pick jobs in random order.
+ * Note that it may only be weakly random (e.g. a lottery of the oldest X).
+ * If "any" is choosen, the queue will use whatever order is the fastest.
+ * This might be useful for improving concurrency for job acquisition.
+ * - claimTTL : If supported, the queue will recycle jobs that have been popped
+ * but not acknowledged as completed after this many seconds. Recycling
+ * of jobs simple means re-inserting them into the queue. Jobs can be
+ * attempted up to three times before being discarded.
+ * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions.
+ * This lets delayed jobs wait in a staging area until a given timestamp is
+ * reached, at which point they will enter the queue. If this is not enabled
+ * or not supported, an exception will be thrown on delayed job insertion.
*
* Queue classes should throw an exception if they do not support the options given.
*
abstract protected function optimalOrder();
/**
- * Quickly check if the queue is empty (has no available jobs).
+ * @return boolean Whether delayed jobs are supported
+ */
+ protected function supportsDelayedJobs() {
+ return false; // not implemented
+ }
+
+ /**
+ * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
* Queue classes should use caching if they are any slower without memcached.
*
* If caching is used, this might return false when there are actually no jobs.
abstract protected function doIsEmpty();
/**
- * Get the number of available (unacquired) jobs in the queue.
+ * Get the number of available (unacquired, non-delayed) jobs in the queue.
* Queue classes should use caching if they are any slower without memcached.
*
* If caching is used, this number might be out of date for a minute.
*/
abstract protected function doGetAcquiredCount();
+ /**
+ * Get the number of delayed jobs (these are temporarily out of the queue).
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return integer
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function getDelayedCount() {
+ wfProfileIn( __METHOD__ );
+ $res = $this->doGetDelayedCount();
+ wfProfileOut( __METHOD__ );
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getDelayedCount()
+ * @return integer
+ */
+ protected function doGetDelayedCount() {
+ return 0; // not implemented
+ }
+
/**
* Push a single jobs into the queue.
* This does not require $wgJobClasses to be set for the given job type.
* Outside callers should use JobQueueGroup::push() instead of this function.
*
* @param $jobs Job|Array
- * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
+ * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
* @return bool Returns false on failure
* @throws MWException
*/
* Outside callers should use JobQueueGroup::push() instead of this function.
*
* @param array $jobs List of Jobs
- * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
+ * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC)
* @return bool Returns false on failure
* @throws MWException
*/
foreach ( $jobs as $job ) {
if ( $job->getType() !== $this->type ) {
- throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ throw new MWException(
+ "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
+ } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) {
+ throw new MWException(
+ "Got delayed '{$job->getType()}' job; delays are not supported." );
}
}
wfProfileIn( __METHOD__ );
$job = $this->doPop();
wfProfileOut( __METHOD__ );
+
+ // Flag this job as an old duplicate based on its "root" job...
+ try {
+ if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
+ wfIncrStats( 'job-pop-duplicate' );
+ $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
+ }
+ } catch ( MWException $e ) {} // don't lose jobs over this
+
return $job;
}
* @return bool
*/
protected function doDeduplicateRootJob( Job $job ) {
- return true;
+ global $wgMemc;
+
+ $params = $job->getParams();
+ if ( !isset( $params['rootJobSignature'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
+ } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
+ }
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+ // Callers should call batchInsert() and then this function so that if the insert
+ // fails, the de-duplication registration will be aborted. Since the insert is
+ // deferred till "transaction idle", do the same here, so that the ordering is
+ // maintained. Having only the de-duplication registration succeed would cause
+ // jobs to become no-ops without any actual jobs that made them redundant.
+ $timestamp = $wgMemc->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
+
+ // Update the timestamp of the last root job started at the location...
+ return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+ }
+
+ /**
+ * Check if the "root" job of a given job has been superseded by a newer one
+ *
+ * @param $job Job
+ * @return bool
+ * @throws MWException
+ */
+ final protected function isRootJobOldDuplicate( Job $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ wfProfileIn( __METHOD__ );
+ $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
+ wfProfileOut( __METHOD__ );
+ return $isDuplicate;
+ }
+
+ /**
+ * @see JobQueue::isRootJobOldDuplicate()
+ * @param Job $job
+ * @return bool
+ */
+ protected function doIsRootJobOldDuplicate( Job $job ) {
+ global $wgMemc;
+
+ $params = $job->getParams();
+ if ( !isset( $params['rootJobSignature'] ) ) {
+ return false; // job has no de-deplication info
+ } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+ trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." );
+ return false;
+ }
+
+ // Get the last time this root job was enqueued
+ $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
+
+ // Check if a new root job was started at the location after this one's...
+ return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+ }
+
+ /**
+ * @param string $signature Hash identifier of the root job
+ * @return string
+ */
+ protected function getRootJobCacheKey( $signature ) {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
}
/**
protected function doFlushCaches() {}
/**
- * Get an iterator to traverse over all of the jobs in this queue.
- * This does not include jobs that are current acquired. In general,
- * this should only be called on a queue that is no longer being popped.
+ * Get an iterator to traverse over all available jobs in this queue.
+ * This does not include jobs that are currently acquired or delayed.
+ * This should only be called on a queue that is no longer being popped.
*
* @return Iterator|Traversable|Array
* @throws MWException
*/
abstract public function getAllQueuedJobs();
+ /**
+ * Get an iterator to traverse over all delayed jobs in this queue.
+ * This should only be called on a queue that is no longer being popped.
+ *
+ * @return Iterator|Traversable|Array
+ * @throws MWException
+ * @since 1.22
+ */
+ public function getAllDelayedJobs() {
+ return array(); // not implemented
+ }
+
/**
* Namespace the queue with a key to isolate it for testing
*