const TYPE_ANY = 2; // integer; any job
const USE_CACHE = 1; // integer; use process or persistent cache
- const USE_PRIORITY = 2; // integer; respect deprioritization
const PROC_CACHE_TTL = 15; // integer; seconds
* This pops a job off a queue as specified by $wgJobTypeConf and
* updates the aggregate job queue information cache as needed.
*
- * @param int|string $qtype JobQueueGroup::TYPE_DEFAULT or type string
+ * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string
* @param int $flags Bitfield of JobQueueGroup::USE_* constants
+ * @param array $blacklist List of job types to ignore
* @return Job|bool Returns false on failure
*/
- public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
+ public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = array() ) {
+ $job = false;
+
if ( is_string( $qtype ) ) { // specific job type
- if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) {
- return false; // back off
- }
- $job = $this->get( $qtype )->pop();
- if ( !$job ) {
- JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
+ if ( !in_array( $qtype, $blacklist ) ) {
+ $job = $this->get( $qtype )->pop();
+ if ( !$job ) {
+ JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
+ }
}
-
- return $job;
} else { // any job in the "default" jobs types
if ( $flags & self::USE_CACHE ) {
if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
if ( $qtype == self::TYPE_DEFAULT ) {
$types = array_intersect( $types, $this->getDefaultQueueTypes() );
}
+
+ $types = array_diff( $types, $blacklist ); // avoid selected types
shuffle( $types ); // avoid starvation
foreach ( $types as $type ) { // for each queue...
- if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) {
- continue; // back off
- }
$job = $this->get( $type )->pop();
if ( $job ) { // found
- return $job;
+ break;
} else { // not found
JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $type );
$this->cache->clear( 'queues-ready' );
}
}
-
- return false; // no jobs found
}
+
+ return $job;
}
/**
return $this->coalescedQueues;
}
- /**
- * Check if jobs should not be popped of a queue right now.
- * This is only used for performance, such as to avoid spamming
- * the queue with many sub-jobs before they actually get run.
- *
- * @param string $type
- * @return bool
- */
- public function isQueueDeprioritized( $type ) {
- if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) {
- return $this->cache->get( 'isDeprioritized', $type );
- }
- if ( $type === 'refreshLinks2' ) {
- // Don't keep converting refreshLinksPartition => refreshLinks jobs if the
- // later jobs have not been done yet. This helps throttle queue spam.
- // @TODO: this is mostly a WMF-specific hack and should be removed when
- // refreshLinks2 jobs are drained.
- $deprioritized = !$this->get( 'refreshLinks' )->getSize() > 10000;
- $this->cache->set( 'isDeprioritized', $type, $deprioritized );
-
- return $deprioritized;
- }
-
- return false;
- }
-
/**
* Execute any due periodic queue maintenance tasks for all queues.
*
}
}
}
+ // The tasks may have recycled jobs or release delayed jobs into the queue
+ if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
+ JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
+ }
}
$wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {