X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjob%2FJobQueueGroup.php;h=cf0215b8bfb83be6c17b818f4fcc1e2e0dfbce55;hb=469064d8cd73c676b187ea5606deb951fd9a6330;hp=4ebd531d153c86368f4f0282e5011ae7ecd628e8;hpb=629df62ac2f8cc8639d9a470e2e6495b1aaf8f1b;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/job/JobQueueGroup.php b/includes/job/JobQueueGroup.php index 4ebd531d15..cf0215b8bf 100644 --- a/includes/job/JobQueueGroup.php +++ b/includes/job/JobQueueGroup.php @@ -25,22 +25,30 @@ * Class to handle enqueueing of background jobs * * @ingroup JobQueue - * @since 1.20 + * @since 1.21 */ class JobQueueGroup { /** @var Array */ protected static $instances = array(); + /** @var ProcessCacheLRU */ + protected $cache; + protected $wiki; // string; wiki ID - const TYPE_DEFAULT = 1; // integer; job not in $wgJobTypesExcludedFromDefaultQueue + const TYPE_DEFAULT = 1; // integer; jobs popped by default const TYPE_ANY = 2; // integer; any job + const USE_CACHE = 1; // integer; use process cache + + const PROC_CACHE_TTL = 15; // integer; seconds + /** * @param $wiki string Wiki ID */ protected function __construct( $wiki ) { $this->wiki = $wiki; + $this->cache = new ProcessCacheLRU( 1 ); } /** @@ -55,6 +63,15 @@ class JobQueueGroup { return self::$instances[$wiki]; } + /** + * Destroy the singleton instances + * + * @return void + */ + public static function destroySingletons() { + self::$instances = array(); + } + /** * @param $type string * @return JobQueue Job queue object for a given queue type @@ -73,9 +90,11 @@ class JobQueueGroup { } /** - * Insert jobs into the respective queues of with the belong + * Insert jobs into the respective queues of with the belong. + * This inserts the jobs into the queue specified by $wgJobTypeConf. * * @param $jobs Job|array A single Job or a list of Jobs + * @throws MWException * @return bool */ public function push( $jobs ) { @@ -92,30 +111,49 @@ class JobQueueGroup { $ok = true; foreach ( $jobsByType as $type => $jobs ) { - if ( !$this->get( $type )->batchPush( $jobs ) ) { + if ( !$this->get( $type )->push( $jobs ) ) { $ok = false; } } + if ( $this->cache->has( 'queues-ready', 'list' ) ) { + $list = $this->cache->get( 'queues-ready', 'list' ); + if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { + $this->cache->clear( 'queues-ready' ); + } + } + return $ok; } /** * Pop a job off one of the job queues * - * @param $type integer JobQueueGroup::TYPE_* constant + * @param $queueType integer JobQueueGroup::TYPE_* constant + * @param $flags integer Bitfield of JobQueueGroup::USE_* constants * @return Job|bool Returns false on failure */ - public function pop( $type = self::TYPE_DEFAULT ) { - $types = ( $type == self::TYPE_DEFAULT ) - ? $this->getDefaultQueueTypes() - : $this->getQueueTypes(); + public function pop( $queueType = self::TYPE_DEFAULT, $flags = 0 ) { + if ( $flags & self::USE_CACHE ) { + if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { + $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); + } + $types = $this->cache->get( 'queues-ready', 'list' ); + } else { + $types = $this->getQueuesWithJobs(); + } + + if ( $queueType == self::TYPE_DEFAULT ) { + $types = array_intersect( $types, $this->getDefaultQueueTypes() ); + } shuffle( $types ); // avoid starvation foreach ( $types as $type ) { // for each queue... $job = $this->get( $type )->pop(); - if ( $job ) { - return $job; // found + if ( $job ) { // found + return $job; + } else { // not found + $this->cache->clear( 'queues-ready' ); } } @@ -132,6 +170,17 @@ class JobQueueGroup { return $this->get( $job->getType() )->ack( $job ); } + /** + * Register the "root job" of a given job into the queue for de-duplication. + * This should only be called right *after* all the new jobs have been inserted. + * + * @param $job Job + * @return bool + */ + public function deduplicateRootJob( Job $job ) { + return $this->get( $job->getType() )->deduplicateRootJob( $job ); + } + /** * Get the list of queue types * @@ -153,4 +202,30 @@ class JobQueueGroup { return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); } + + /** + * @return Array List of job types that have non-empty queues + */ + public function getQueuesWithJobs() { + $types = array(); + foreach ( $this->getQueueTypes() as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } + return $types; + } + + /** + * @return Array List of default job types that have non-empty queues + */ + public function getDefaultQueuesWithJobs() { + $types = array(); + foreach ( $this->getDefaultQueueTypes() as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } + return $types; + } }