*/
public function push( $jobs ) {
$jobs = is_array( $jobs ) ? $jobs : array( $jobs );
+ if ( !count( $jobs ) ) {
+ return true;
+ }
$jobsByType = array(); // (job type => list of jobs)
foreach ( $jobs as $job ) {
*/
public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0 ) {
if ( is_string( $qtype ) ) { // specific job type
- if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $qtype ) ) {
- return false; // back off
- }
$job = $this->get( $qtype )->pop();
if ( !$job ) {
JobQueueAggregator::singleton()->notifyQueueEmpty( $this->wiki, $qtype );
shuffle( $types ); // avoid starvation
foreach ( $types as $type ) { // for each queue...
- if ( ( $flags & self::USE_PRIORITY ) && $this->isQueueDeprioritized( $type ) ) {
- continue; // back off
- }
$job = $this->get( $type )->pop();
if ( $job ) { // found
return $job;
return $this->coalescedQueues;
}
- /**
- * Check if jobs should not be popped of a queue right now.
- * This is only used for performance, such as to avoid spamming
- * the queue with many sub-jobs before they actually get run.
- *
- * @param string $type
- * @return bool
- */
- public function isQueueDeprioritized( $type ) {
- if ( $this->cache->has( 'isDeprioritized', $type, 5 ) ) {
- return $this->cache->get( 'isDeprioritized', $type );
- }
- if ( $type === 'refreshLinks2' ) {
- // Don't keep converting refreshLinks2 => refreshLinks jobs if the
- // later jobs have not been done yet. This helps throttle queue spam.
- $deprioritized = !$this->get( 'refreshLinks' )->isEmpty();
- $this->cache->set( 'isDeprioritized', $type, $deprioritized );
-
- return $deprioritized;
- }
-
- return false;
- }
-
/**
* Execute any due periodic queue maintenance tasks for all queues.
*
}
}
}
+ // The tasks may have recycled jobs or release delayed jobs into the queue
+ if ( isset( $tasksRun[$type] ) && !$queue->isEmpty() ) {
+ JobQueueAggregator::singleton()->notifyQueueNonEmpty( $this->wiki, $type );
+ }
}
$wgMemc->merge( $key, function ( $cache, $key, $lastRuns ) use ( $tasksRun ) {