) {
unset( $baseConfig[$o] ); // partition queue doesn't care about this
}
+ // The class handles all aggregator calls already
+ unset( $baseConfig['aggregator'] );
// Get the partition queue objects
foreach ( $partitionMap as $partition => $w ) {
if ( !isset( $params['configByPartition'][$partition] ) ) {
return false;
}
- protected function doDeduplicateRootJob( Job $job ) {
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
$params = $job->getRootJobParams();
$sigature = $params['rootJobSignature'];
$partition = $this->partitionRing->getLiveLocation( $sigature );
$this->throwErrorIfAllPartitionsDown( $failed );
}
- protected function doGetPeriodicTasks() {
- $tasks = array();
- /** @var JobQueue $queue */
- foreach ( $this->partitionQueues as $partition => $queue ) {
- foreach ( $queue->getPeriodicTasks() as $task => $def ) {
- $tasks["{$partition}:{$task}"] = $def;
- }
- }
-
- return $tasks;
- }
-
protected function doFlushCaches() {
- static $types = array(
- 'empty',
- 'size',
- 'acquiredcount',
- 'delayedcount',
- 'abandonedcount'
- );
-
/** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
$queue->doFlushCaches();
return $iterator;
}
+ public function getAllAcquiredJobs() {
+ $iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $iterator->append( $queue->getAllAcquiredJobs() );
+ }
+
+ return $iterator;
+ }
+
public function getAllAbandonedJobs() {
$iterator = new AppendIterator();
throw new JobQueueError( 'No queue partitions available.' );
}
}
-
- public function setTestingPrefix( $key ) {
- /** @var JobQueue $queue */
- foreach ( $this->partitionQueues as $queue ) {
- $queue->setTestingPrefix( $key );
- }
- }
}