class JobQueueFederated extends JobQueue {
/** @var HashRing */
protected $partitionRing;
- /** @var array (partition name => JobQueue) reverse sorted by weight */
- protected $partitionQueues = array();
+ /** @var JobQueue[] (partition name => JobQueue) reverse sorted by weight */
+ protected $partitionQueues = [];
/** @var int Maximum number of partitions to try */
protected $maxPartitionsTry;
arsort( $partitionMap, SORT_NUMERIC );
// Get the config to pass to merge into each partition queue config
$baseConfig = $params;
- foreach ( array( 'class', 'sectionsByWiki', 'maxPartitionsTry',
- 'partitionsBySection', 'configByPartition', ) as $o
+ foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry',
+ 'partitionsBySection', 'configByPartition', ] as $o
) {
unset( $baseConfig[$o] ); // partition queue doesn't care about this
}
+ // The class handles all aggregator calls already
+ unset( $baseConfig['aggregator'] );
// Get the partition queue objects
foreach ( $partitionMap as $partition => $w ) {
if ( !isset( $params['configByPartition'][$partition] ) ) {
protected function supportedOrders() {
// No FIFO due to partitioning, though "rough timestamp order" is supported
- return array( 'undefined', 'random', 'timestamp' );
+ return [ 'undefined', 'random', 'timestamp' ];
}
protected function optimalOrder() {
* @return array List of Job object that could not be inserted
*/
protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
- $jobsLeft = array();
+ $jobsLeft = [];
// Because jobs are spread across partitions, per-job de-duplication needs
// to use a consistent hash to avoid allowing duplicate jobs per partition.
// When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
- $uJobsByPartition = array(); // (partition name => job list)
+ $uJobsByPartition = []; // (partition name => job list)
/** @var Job $job */
foreach ( $jobs as $key => $job ) {
if ( $job->ignoreDuplicates() ) {
}
// Get the batches of jobs that are not de-duplicated
if ( $flags & self::QOS_ATOMIC ) {
- $nuJobBatches = array( $jobs ); // all or nothing
+ $nuJobBatches = [ $jobs ]; // all or nothing
} else {
// Split the jobs into batches and spread them out over servers if there
// are many jobs. This helps keep the partitions even. Otherwise, send all
throw new MWException( "The given job has no defined partition name." );
}
- return $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
+ $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
}
protected function doIsRootJobOldDuplicate( Job $job ) {
- $params = $job->getRootJobParams();
- $sigature = $params['rootJobSignature'];
- $partition = $this->partitionRing->getLiveLocation( $sigature );
+ $signature = $job->getRootJobParams()['rootJobSignature'];
+ $partition = $this->partitionRing->getLiveLocation( $signature );
try {
return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
} catch ( JobQueueError $e ) {
if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
- $partition = $this->partitionRing->getLiveLocation( $sigature );
+ $partition = $this->partitionRing->getLiveLocation( $signature );
return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
}
}
return false;
}
- protected function doDeduplicateRootJob( Job $job ) {
- $params = $job->getRootJobParams();
- $sigature = $params['rootJobSignature'];
- $partition = $this->partitionRing->getLiveLocation( $sigature );
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
+ $signature = $job->getRootJobParams()['rootJobSignature'];
+ $partition = $this->partitionRing->getLiveLocation( $signature );
try {
return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
} catch ( JobQueueError $e ) {
if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
- $partition = $this->partitionRing->getLiveLocation( $sigature );
+ $partition = $this->partitionRing->getLiveLocation( $signature );
return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
}
}
$this->throwErrorIfAllPartitionsDown( $failed );
}
- protected function doGetPeriodicTasks() {
- $tasks = array();
- /** @var JobQueue $queue */
- foreach ( $this->partitionQueues as $partition => $queue ) {
- foreach ( $queue->getPeriodicTasks() as $task => $def ) {
- $tasks["{$partition}:{$task}"] = $def;
- }
- }
-
- return $tasks;
- }
-
protected function doFlushCaches() {
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
}
protected function doGetSiblingQueuesWithJobs( array $types ) {
- $result = array();
+ $result = [];
$failed = 0;
/** @var JobQueue $queue */
}
protected function doGetSiblingQueueSizes( array $types ) {
- $result = array();
+ $result = [];
$failed = 0;
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
throw new JobQueueError( 'No queue partitions available.' );
}
}
-
- public function setTestingPrefix( $key ) {
- /** @var JobQueue $queue */
- foreach ( $this->partitionQueues as $queue ) {
- $queue->setTestingPrefix( $key );
- }
- }
}