X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjob%2FJobQueue.php;h=9c152cdf693960d6d52fc34d06827a4770f2e911;hb=0f22a50c909662e7a88af2e8603cd7a16d7b7183;hp=7ce654b3bead128ba84ea2d2b323d671319a745f;hpb=4feea6a4ec1fe0a1f7a2518d89c293fdb2df5f22;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 7ce654b3be..9c152cdf69 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -33,39 +33,59 @@ abstract class JobQueue { protected $type; // string; job type protected $order; // string; job priority for pop() protected $claimTTL; // integer; seconds + protected $maxTries; // integer; maximum number of times to try a job + protected $checkDelay; // boolean; allow delayed jobs const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions - const MAX_ATTEMPTS = 3; // integer; number of times to try a job + const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) /** * @param $params array */ protected function __construct( array $params ) { - $this->wiki = $params['wiki']; - $this->type = $params['type']; - $this->order = isset( $params['order'] ) ? $params['order'] : 'random'; + $this->wiki = $params['wiki']; + $this->type = $params['type']; $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; + $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; + if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { + $this->order = $params['order']; + } else { + $this->order = $this->optimalOrder(); + } + if ( !in_array( $this->order, $this->supportedOrders() ) ) { + throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); + } + $this->checkDelay = !empty( $params['checkDelay'] ); + if ( $this->checkDelay && !$this->supportsDelayedJobs() ) { + throw new MWException( __CLASS__ . " does not support delayed jobs." ); + } } /** * Get a job queue object of the specified type. * $params includes: - * - class : What job class to use (determines job type) - * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) - * - type : The name of the job types this queue handles - * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". - * If "fifo" is used, the queue will effectively be FIFO. Note that - * job completion will not appear to be exactly FIFO if there are multiple - * job runners since jobs can take different times to finish once popped. - * If "timestamp" is used, the queue will at least be loosely ordered - * by timestamp, allowing for some jobs to be popped off out of order. - * If "random" is used, pop() will pick jobs in random order. This might be - * useful for improving concurrency depending on the queue storage medium. - * - claimTTL : If supported, the queue will recycle jobs that have been popped - * but not acknowledged as completed after this many seconds. Recycling - * of jobs simple means re-inserting them into the queue. Jobs can be - * attempted up to three times before being discarded. + * - class : What job class to use (determines job type) + * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * - type : The name of the job types this queue handles + * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". + * If "fifo" is used, the queue will effectively be FIFO. Note that job + * completion will not appear to be exactly FIFO if there are multiple + * job runners since jobs can take different times to finish once popped. + * If "timestamp" is used, the queue will at least be loosely ordered + * by timestamp, allowing for some jobs to be popped off out of order. + * If "random" is used, pop() will pick jobs in random order. + * Note that it may only be weakly random (e.g. a lottery of the oldest X). + * If "any" is choosen, the queue will use whatever order is the fastest. + * This might be useful for improving concurrency for job acquisition. + * - claimTTL : If supported, the queue will recycle jobs that have been popped + * but not acknowledged as completed after this many seconds. Recycling + * of jobs simple means re-inserting them into the queue. Jobs can be + * attempted up to three times before being discarded. + * - checkDelay : If supported, respect Job::getReleaseTimestamp() in the push functions. + * This lets delayed jobs wait in a staging area until a given timestamp is + * reached, at which point they will enter the queue. If this is not enabled + * or not supported, an exception will be thrown on delayed job insertion. * * Queue classes should throw an exception if they do not support the options given. * @@ -100,9 +120,38 @@ abstract class JobQueue { } /** - * Quickly check if the queue is empty (has no available jobs). + * @return string One of (random, timestamp, fifo) + */ + final public function getOrder() { + return $this->order; + } + + /** + * @return Array Subset of (random, timestamp, fifo) + */ + abstract protected function supportedOrders(); + + /** + * @return string One of (random, timestamp, fifo) + */ + abstract protected function optimalOrder(); + + /** + * @return boolean Whether delayed jobs are supported + */ + protected function supportsDelayedJobs() { + return false; // not implemented + } + + /** + * Quickly check if the queue has no available (unacquired, non-delayed) jobs. * Queue classes should use caching if they are any slower without memcached. * + * If caching is used, this might return false when there are actually no jobs. + * If pop() is called and returns false then it should correct the cache. Also, + * calling flushCaches() first prevents this. However, this affect is typically + * not distinguishable from the race condition between isEmpty() and pop(). + * * @return bool * @throws MWException */ @@ -120,9 +169,11 @@ abstract class JobQueue { abstract protected function doIsEmpty(); /** - * Get the number of available jobs in the queue. + * Get the number of available (unacquired, non-delayed) jobs in the queue. * Queue classes should use caching if they are any slower without memcached. * + * If caching is used, this number might be out of date for a minute. + * * @return integer * @throws MWException */ @@ -143,6 +194,8 @@ abstract class JobQueue { * Get the number of acquired jobs (these are temporarily out of the queue). * Queue classes should use caching if they are any slower without memcached. * + * If caching is used, this number might be out of date for a minute. + * * @return integer * @throws MWException */ @@ -159,9 +212,35 @@ abstract class JobQueue { */ abstract protected function doGetAcquiredCount(); + /** + * Get the number of delayed jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return integer + * @throws MWException + * @since 1.22 + */ + final public function getDelayedCount() { + wfProfileIn( __METHOD__ ); + $res = $this->doGetDelayedCount(); + wfProfileOut( __METHOD__ ); + return $res; + } + + /** + * @see JobQueue::getDelayedCount() + * @return integer + */ + protected function doGetDelayedCount() { + return 0; // not implemented + } + /** * Push a single jobs into the queue. * This does not require $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::push() instead of this function. * * @param $jobs Job|Array * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) @@ -169,26 +248,34 @@ abstract class JobQueue { * @throws MWException */ final public function push( $jobs, $flags = 0 ) { - $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); - - return $this->batchPush( $jobs, $flags ); + return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags ); } /** * Push a batch of jobs into the queue. * This does not require $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::push() instead of this function. * - * @param $jobs array List of Jobs + * @param array $jobs List of Jobs * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic) * @return bool Returns false on failure * @throws MWException */ final public function batchPush( array $jobs, $flags = 0 ) { + if ( !count( $jobs ) ) { + return true; // nothing to do + } + foreach ( $jobs as $job ) { if ( $job->getType() !== $this->type ) { - throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + throw new MWException( + "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); + } elseif ( $job->getReleaseTimestamp() && !$this->checkDelay ) { + throw new MWException( + "Got delayed '{$job->getType()}' job; delays are not supported." ); } } + wfProfileIn( __METHOD__ ); $ok = $this->doBatchPush( $jobs, $flags ); wfProfileOut( __METHOD__ ); @@ -204,8 +291,9 @@ abstract class JobQueue { /** * Pop a job off of the queue. * This requires $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::pop() instead of this function. * - * @return Job|bool Returns false on failure + * @return Job|bool Returns false if there are no jobs * @throws MWException */ final public function pop() { @@ -221,6 +309,15 @@ abstract class JobQueue { wfProfileIn( __METHOD__ ); $job = $this->doPop(); wfProfileOut( __METHOD__ ); + + // Flag this job as an old duplicate based on its "root" job... + try { + if ( $job && $this->isRootJobOldDuplicate( $job ) ) { + wfIncrStats( 'job-pop-duplicate' ); + $job = DuplicateJob::newFromJob( $job ); // convert to a no-op + } + } catch ( MWException $e ) {} // don't lose jobs over this + return $job; } @@ -234,6 +331,7 @@ abstract class JobQueue { * Acknowledge that a job was completed. * * This does nothing for certain queue classes or if "claimTTL" is not set. + * Outside callers should use JobQueueGroup::ack() instead of this function. * * @param $job Job * @return bool @@ -302,7 +400,76 @@ abstract class JobQueue { * @return bool */ protected function doDeduplicateRootJob( Job $job ) { - return true; + global $wgMemc; + + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); + } + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $timestamp = $wgMemc->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } + + /** + * Check if the "root" job of a given job has been superseded by a newer one + * + * @param $job Job + * @return bool + * @throws MWException + */ + final protected function isRootJobOldDuplicate( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + wfProfileIn( __METHOD__ ); + $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); + wfProfileOut( __METHOD__ ); + return $isDuplicate; + } + + /** + * @see JobQueue::isRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + global $wgMemc; + + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + return false; // job has no de-deplication info + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + trigger_error( "Cannot check root job; missing 'rootJobTimestamp'." ); + return false; + } + + // Get the last time this root job was enqueued + $timestamp = $wgMemc->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + protected function getRootJobCacheKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); } /** @@ -369,4 +536,37 @@ abstract class JobQueue { * @return void */ protected function doFlushCaches() {} + + /** + * Get an iterator to traverse over all available jobs in this queue. + * This does not include jobs that are currently acquired or delayed. + * This should only be called on a queue that is no longer being popped. + * + * @return Iterator|Traversable|Array + * @throws MWException + */ + abstract public function getAllQueuedJobs(); + + /** + * Get an iterator to traverse over all delayed jobs in this queue. + * This should only be called on a queue that is no longer being popped. + * + * @return Iterator|Traversable|Array + * @throws MWException + * @since 1.22 + */ + public function getAllDelayedJobs() { + return array(); // not implemented + } + + /** + * Namespace the queue with a key to isolate it for testing + * + * @param $key string + * @return void + * @throws MWException + */ + public function setTestingPrefix( $key ) { + throw new MWException( "Queue namespacing not supported for this queue type." ); + } }