X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjob%2FJobQueue.php;h=17a1338f6704fde5570ef308629c3ab17a659f23;hb=aca92231392c9664f13358c587cbfee568a5d19d;hp=b0dd9258855602000203dfbcafa2369523bfbce1;hpb=b929f19df9a1c89f3b1b91ee3bd67c57839a4e7d;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index b0dd925885..17a1338f67 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -34,8 +34,12 @@ abstract class JobQueue { protected $order; // string; job priority for pop() protected $claimTTL; // integer; seconds protected $maxTries; // integer; maximum number of times to try a job + protected $checkDelay; // boolean; allow delayed jobs - const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions + const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions + const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions (b/c) + + const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) /** * @param $params array @@ -53,28 +57,36 @@ abstract class JobQueue { if ( !in_array( $this->order, $this->supportedOrders() ) ) { throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); } + $this->checkDelay = !empty( $params['checkDelay'] ); + if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { + throw new MWException( __CLASS__ . " does not support delayed jobs." ); + } } /** * Get a job queue object of the specified type. * $params includes: - * - class : What job class to use (determines job type) - * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) - * - type : The name of the job types this queue handles - * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". - * If "fifo" is used, the queue will effectively be FIFO. Note that - * job completion will not appear to be exactly FIFO if there are multiple - * job runners since jobs can take different times to finish once popped. - * If "timestamp" is used, the queue will at least be loosely ordered - * by timestamp, allowing for some jobs to be popped off out of order. - * If "random" is used, pop() will pick jobs in random order. - * Note that it may only be weakly random (e.g. a lottery of the oldest X). - * If "any" is choosen, the queue will use whatever order is the fastest. - * This might be useful for improving concurrency for job acquisition. - * - claimTTL : If supported, the queue will recycle jobs that have been popped - * but not acknowledged as completed after this many seconds. Recycling - * of jobs simple means re-inserting them into the queue. Jobs can be - * attempted up to three times before being discarded. + * - class : What job class to use (determines job type) + * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * - type : The name of the job types this queue handles + * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". + * If "fifo" is used, the queue will effectively be FIFO. Note that job + * completion will not appear to be exactly FIFO if there are multiple + * job runners since jobs can take different times to finish once popped. + * If "timestamp" is used, the queue will at least be loosely ordered + * by timestamp, allowing for some jobs to be popped off out of order. + * If "random" is used, pop() will pick jobs in random order. + * Note that it may only be weakly random (e.g. a lottery of the oldest X). + * If "any" is choosen, the queue will use whatever order is the fastest. + * This might be useful for improving concurrency for job acquisition. + * - claimTTL : If supported, the queue will recycle jobs that have been popped + * but not acknowledged as completed after this many seconds. Recycling + * of jobs simple means re-inserting them into the queue. Jobs can be + * attempted up to three times before being discarded. + * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. + * This lets delayed jobs wait in a staging area until a given timestamp is + * reached, at which point they will enter the queue. If this is not enabled + * or not supported, an exception will be thrown on delayed job insertion. * * Queue classes should throw an exception if they do not support the options given. * @@ -126,7 +138,14 @@ abstract class JobQueue { abstract protected function optimalOrder(); /** - * Quickly check if the queue is empty (has no available jobs). + * @return boolean Whether delayed jobs are supported + */ + protected function supportsDelayedJobs() { + return false; // not implemented + } + + /** + * Quickly check if the queue has no available (unacquired, non-delayed) jobs. * Queue classes should use caching if they are any slower without memcached. * * If caching is used, this might return false when there are actually no jobs. @@ -151,7 +170,7 @@ abstract class JobQueue { abstract protected function doIsEmpty(); /** - * Get the number of available (unacquired) jobs in the queue. + * Get the number of available (unacquired, non-delayed) jobs in the queue. * Queue classes should use caching if they are any slower without memcached. * * If caching is used, this number might be out of date for a minute. @@ -194,13 +213,62 @@ abstract class JobQueue { */ abstract protected function doGetAcquiredCount(); + /** + * Get the number of delayed jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws MWException + * @since 1.22 + */ + final public function getDelayedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetDelayedCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getDelayedCount() + * @return integer + */ + protected function doGetDelayedCount() { + return 0; // not implemented + } + + /** + * Get the number of acquired jobs that can no longer be attempted. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws MWException + */ + final public function getAbandonedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetAbandonedCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getAbandonedCount() + * @return integer + */ + protected function doGetAbandonedCount() { + return 0; // not implemented + } + /** * Push a single jobs into the queue. * This does not require $wgJobClasses to be set for the given job type. * Outside callers should use JobQueueGroup::push() instead of this function. * * @param $jobs Job|Array - * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) + * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) * @return bool Returns false on failure * @throws MWException */ @@ -214,7 +282,7 @@ abstract class JobQueue { * Outside callers should use JobQueueGroup::push() instead of this function. * * @param array $jobs List of Jobs - * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) + * @param $flags integer Bitfield (supports JobQueue::QOS_ATOMIC) * @return bool Returns false on failure * @throws MWException */ @@ -225,7 +293,11 @@ abstract class JobQueue { foreach ( $jobs as $job ) { if ( $job->getType() !== $this->type ) { - throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + throw new MWException( + "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); + } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { + throw new MWException( + "Got delayed '{$job->getType()}' job; delays are not supported." ); } } @@ -262,6 +334,15 @@ abstract class JobQueue { wfProfileIn( __METHOD__ ); $job = $this->doPop(); wfProfileOut( __METHOD__ ); + + // Flag this job as an old duplicate based on its "root" job... + try { + if ( $job && $this->isRootJobOldDuplicate( $job ) ) { + wfIncrStats( 'job-pop-duplicate' ); + $job = DuplicateJob::newFromJob( $job ); // convert to a no-op + } + } catch ( MWException $e ) {} // don't lose jobs over this + return $job; } @@ -344,7 +425,72 @@ abstract class JobQueue { * @return bool */ protected function doDeduplicateRootJob( Job $job ) { - return true; + global $wgMemc; + + if ( !$job->hasRootJobParams() ) { + throw new MWException( "Cannot register root job; missing parameters." ); + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $timestamp = $wgMemc->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } + + /** + * Check if the "root" job of a given job has been superseded by a newer one + * + * @param $job Job + * @return bool + * @throws MWException + */ + final protected function isRootJobOldDuplicate( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); + wfProfileOut( __METHOD__ ); + return $isDuplicate; + } + + /** + * @see JobQueue::isRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + global $wgMemc; + + if ( !$job->hasRootJobParams() ) { + return false; // job has no de-deplication info + } + $params = $job->getRootJobParams(); + + // Get the last time this root job was enqueued + $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + protected function getRootJobCacheKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); } /** @@ -413,15 +559,27 @@ abstract class JobQueue { protected function doFlushCaches() {} /** - * Get an iterator to traverse over all of the jobs in this queue. - * This does not include jobs that are current acquired. In general, - * this should only be called on a queue that is no longer being popped. + * Get an iterator to traverse over all available jobs in this queue. + * This does not include jobs that are currently acquired or delayed. + * This should only be called on a queue that is no longer being popped. * - * @return Iterator|Traversable|Array + * @return Iterator * @throws MWException */ abstract public function getAllQueuedJobs(); + /** + * Get an iterator to traverse over all delayed jobs in this queue. + * This should only be called on a queue that is no longer being popped. + * + * @return Iterator + * @throws MWException + * @since 1.22 + */ + public function getAllDelayedJobs() { + return new ArrayIterator( array() ); // not implemented + } + /** * Namespace the queue with a key to isolate it for testing *