wiki = $wiki; } /** * @param $wiki string Wiki ID * @return JobQueueGroup */ public static function singleton( $wiki = false ) { $wiki = ( $wiki === false ) ? wfWikiID() : $wiki; if ( !isset( self::$instances[$wiki] ) ) { self::$instances[$wiki] = new self( $wiki ); } return self::$instances[$wiki]; } /** * @param $type string * @return JobQueue Job queue object for a given queue type */ public function get( $type ) { global $wgJobTypeConf; $conf = array( 'wiki' => $this->wiki, 'type' => $type ); if ( isset( $wgJobTypeConf[$type] ) ) { $conf = $conf + $wgJobTypeConf[$type]; } else { $conf = $conf + $wgJobTypeConf['default']; } return JobQueue::factory( $conf ); } /** * Insert jobs into the respective queues of with the belong * * @param $jobs Job|array A single Job or a list of Jobs * @return bool */ public function push( $jobs ) { $jobs = is_array( $jobs ) ? $jobs : array( $jobs ); $jobsByType = array(); // (job type => list of jobs) foreach ( $jobs as $job ) { if ( $job instanceof Job ) { $jobsByType[$job->getType()][] = $job; } else { throw new MWException( "Attempted to push a non-Job object into a queue." ); } } $ok = true; foreach ( $jobsByType as $type => $jobs ) { if ( !$this->get( $type )->batchPush( $jobs ) ) { $ok = false; } } return $ok; } /** * Pop a job off one of the job queues * * @param $type integer JobQueueGroup::TYPE_* constant * @return Job|bool Returns false on failure */ public function pop( $type = self::TYPE_DEFAULT ) { $types = ( $type == self::TYPE_DEFAULT ) ? $this->getDefaultQueueTypes() : $this->getQueueTypes(); shuffle( $types ); // avoid starvation foreach ( $types as $type ) { // for each queue... $job = $this->get( $type )->pop(); if ( $job ) { return $job; // found } } return false; // no jobs found } /** * Acknowledge that a job was completed * * @param $job Job * @return bool */ public function ack( Job $job ) { return $this->get( $job->getType() )->ack( $job ); } /** * Get the list of queue types * * @return array List of strings */ public function getQueueTypes() { global $wgJobClasses; return array_keys( $wgJobClasses ); } /** * Get the list of default queue types * * @return array List of strings */ public function getDefaultQueueTypes() { global $wgJobTypesExcludedFromDefaultQueue; return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); } /** * @return Array List of job types that have non-empty queues */ public function getQueuesWithJobs() { $types = array(); foreach ( $this->getQueueTypes() as $type ) { if ( !$this->get( $type )->isEmpty() ) { $types[] = $type; } } return $types; } }