/** @var Array */
protected static $instances = array();
+ /** @var ProcessCacheLRU */
+ protected $cache;
+
protected $wiki; // string; wiki ID
- const TYPE_DEFAULT = 1; // integer; job not in $wgJobTypesExcludedFromDefaultQueue
+ const TYPE_DEFAULT = 1; // integer; jobs popped by default
const TYPE_ANY = 2; // integer; any job
+ const USE_CACHE = 1; // integer; use process cache
+
+ const PROC_CACHE_TTL = 15; // integer; seconds
+
/**
* @param $wiki string Wiki ID
*/
protected function __construct( $wiki ) {
$this->wiki = $wiki;
+ $this->cache = new ProcessCacheLRU( 1 );
}
/**
return self::$instances[$wiki];
}
+ /**
+ * Destroy the singleton instances
+ *
+ * @return void
+ */
+ public static function destroySingletons() {
+ self::$instances = array();
+ }
+
/**
* @param $type string
* @return JobQueue Job queue object for a given queue type
}
/**
- * Insert jobs into the respective queues of with the belong
+ * Insert jobs into the respective queues of with the belong.
+ * This inserts the jobs into the queue specified by $wgJobTypeConf.
*
* @param $jobs Job|array A single Job or a list of Jobs
+ * @throws MWException
* @return bool
*/
public function push( $jobs ) {
$ok = true;
foreach ( $jobsByType as $type => $jobs ) {
- if ( !$this->get( $type )->batchPush( $jobs ) ) {
+ if ( !$this->get( $type )->push( $jobs ) ) {
$ok = false;
}
}
+ if ( $this->cache->has( 'queues-ready', 'list' ) ) {
+ $list = $this->cache->get( 'queues-ready', 'list' );
+ if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
+ $this->cache->clear( 'queues-ready' );
+ }
+ }
+
return $ok;
}
/**
* Pop a job off one of the job queues
*
- * @param $type integer JobQueueGroup::TYPE_* constant
+ * @param $queueType integer JobQueueGroup::TYPE_* constant
+ * @param $flags integer Bitfield of JobQueueGroup::USE_* constants
* @return Job|bool Returns false on failure
*/
- public function pop( $type = self::TYPE_DEFAULT ) {
- $types = ( $type == self::TYPE_DEFAULT )
- ? $this->getDefaultQueueTypes()
- : $this->getQueueTypes();
+ public function pop( $queueType = self::TYPE_DEFAULT, $flags = 0 ) {
+ if ( $flags & self::USE_CACHE ) {
+ if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
+ $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
+ }
+ $types = $this->cache->get( 'queues-ready', 'list' );
+ } else {
+ $types = $this->getQueuesWithJobs();
+ }
+
+ if ( $queueType == self::TYPE_DEFAULT ) {
+ $types = array_intersect( $types, $this->getDefaultQueueTypes() );
+ }
shuffle( $types ); // avoid starvation
foreach ( $types as $type ) { // for each queue...
$job = $this->get( $type )->pop();
- if ( $job ) {
- return $job; // found
+ if ( $job ) { // found
+ return $job;
+ } else { // not found
+ $this->cache->clear( 'queues-ready' );
}
}
return $this->get( $job->getType() )->ack( $job );
}
+ /**
+ * Register the "root job" of a given job into the queue for de-duplication.
+ * This should only be called right *after* all the new jobs have been inserted.
+ *
+ * @param $job Job
+ * @return bool
+ */
+ public function deduplicateRootJob( Job $job ) {
+ return $this->get( $job->getType() )->deduplicateRootJob( $job );
+ }
+
/**
* Get the list of queue types
*
}
return $types;
}
+
+ /**
+ * @return Array List of default job types that have non-empty queues
+ */
+ public function getDefaultQueuesWithJobs() {
+ $types = array();
+ foreach ( $this->getDefaultQueueTypes() as $type ) {
+ if ( !$this->get( $type )->isEmpty() ) {
+ $types[] = $type;
+ }
+ }
+ return $types;
+ }
}