/** @var JobQueueGroup[] */
protected static $instances = [];
- /** @var ProcessCacheLRU */
+ /** @var MapCacheLRU */
protected $cache;
/** @var string Wiki domain ID */
} else {
$conf = $conf + $wgJobTypeConf['default'];
}
- $conf['aggregator'] = JobQueueAggregator::singleton();
if ( !isset( $conf['readOnlyReason'] ) ) {
$conf['readOnlyReason'] = $this->readOnlyReason;
}
+ $services = MediaWikiServices::getInstance();
+ $conf['stats'] = $services->getStatsdDataFactory();
+ $conf['wanCache'] = $services->getMainWANObjectCache();
+ $conf['stash'] = $services->getMainObjectStash();
+
return JobQueue::factory( $conf );
}
/**
* Buffer jobs for insertion via push() or call it now if in CLI mode
*
- * Note that pushLazyJobs() is registered as a deferred update just before
- * DeferredUpdates::doUpdates() in MediaWiki and JobRunner classes in order
- * to be executed as the very last deferred update (T100085, T154425).
- *
* @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
* @return void
* @since 1.26
DeferredUpdates::addUpdate( new JobQueueEnqueueUpdate( $this->domain, $jobs ) );
}
- /**
- * Push all jobs buffered via lazyPush() into their respective queues
- *
- * @return void
- * @since 1.26
- * @deprecated Since 1.33 Not needed anymore
- */
- public static function pushLazyJobs() {
- wfDeprecated( __METHOD__, '1.33' );
- }
-
/**
* Pop a job off one of the job queues
*
* @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string
* @param int $flags Bitfield of JobQueueGroup::USE_* constants
* @param array $blacklist List of job types to ignore
- * @return Job|bool Returns false on failure
+ * @return RunnableJob|bool Returns false on failure
*/
public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
+ global $wgJobClasses;
+
$job = false;
+ if ( !WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
+ throw new JobQueueError(
+ "Cannot pop '{$qtype}' job off foreign '{$this->domain}' wiki queue." );
+ } elseif ( is_string( $qtype ) && !isset( $wgJobClasses[$qtype] ) ) {
+ // Do not pop jobs if there is no class for the queue type
+ throw new JobQueueError( "Unrecognized job type '$qtype'." );
+ }
+
if ( is_string( $qtype ) ) { // specific job type
if ( !in_array( $qtype, $blacklist ) ) {
$job = $this->get( $qtype )->pop();
/**
* Acknowledge that a job was completed
*
- * @param Job $job
+ * @param RunnableJob $job
* @return void
*/
- public function ack( Job $job ) {
+ public function ack( RunnableJob $job ) {
$this->get( $job->getType() )->ack( $job );
}
* Register the "root job" of a given job into the queue for de-duplication.
* This should only be called right *after* all the new jobs have been inserted.
*
- * @param Job $job
+ * @param RunnableJob $job
* @return bool
*/
- public function deduplicateRootJob( Job $job ) {
+ public function deduplicateRootJob( RunnableJob $job ) {
return $this->get( $job->getType() )->deduplicateRootJob( $job );
}
/**
* Get the list of job types that have non-empty queues
*
- * @return array List of job types that have non-empty queues
+ * @return string[] List of job types that have non-empty queues
*/
public function getQueuesWithJobs() {
$types = [];
foreach ( $this->getCoalescedQueues() as $info ) {
- $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
+ /** @var JobQueue $queue */
+ $queue = $info['queue'];
+ $nonEmpty = $queue->getSiblingQueuesWithJobs( $this->getQueueTypes() );
if ( is_array( $nonEmpty ) ) { // batching features supported
$types = array_merge( $types, $nonEmpty );
} else { // we have to go through the queues in the bucket one-by-one
/**
* Get the size of the queus for a list of job types
*
- * @return array Map of (job type => size)
+ * @return int[] Map of (job type => size)
*/
public function getQueueSizes() {
$sizeMap = [];
foreach ( $this->getCoalescedQueues() as $info ) {
- $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
+ /** @var JobQueue $queue */
+ $queue = $info['queue'];
+ $sizes = $queue->getSiblingQueueSizes( $this->getQueueTypes() );
if ( is_array( $sizes ) ) { // batching features supported
$sizeMap = $sizeMap + $sizes;
} else { // we have to go through the queues in the bucket one-by-one
}
/**
- * @return array
+ * @return JobQueue[]
*/
protected function getCoalescedQueues() {
global $wgJobTypeConf;