* @since 1.22
*/
class JobQueueFederated extends JobQueue {
- /** @var Array (partition name => weight) reverse sorted by weight */
+ /** @var array (partition name => weight) reverse sorted by weight */
protected $partitionMap = array();
- /** @var Array (partition name => JobQueue) reverse sorted by weight */
+ /** @var array (partition name => JobQueue) reverse sorted by weight */
protected $partitionQueues = array();
/** @var HashRing */
/** @var BagOStuff */
protected $cache;
- protected $maxPartitionsTry; // integer; maximum number of partitions to try
+ /** @var int Maximum number of partitions to try */
+ protected $maxPartitionsTry;
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
* during failure, at the cost of added latency and somewhat
* less reliable job de-duplication mechanisms.
* @param array $params
+ * @throws MWException
*/
protected function __construct( array $params ) {
parent::__construct( $params );
/**
* @param string $type
* @param string $method
- * @return integer
+ * @return int
*/
protected function getCrossPartitionSum( $type, $method ) {
$key = $this->getCacheKey( $type );
/**
* @param array $jobs
* @param HashRing $partitionRing
- * @param integer $flags
+ * @param int $flags
+ * @throws JobQueueError
* @return array List of Job object that could not be inserted
*/
protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
// to use a consistent hash to avoid allowing duplicate jobs per partition.
// When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
$uJobsByPartition = array(); // (partition name => job list)
+ /** @var Job $job */
foreach ( $jobs as $key => $job ) {
if ( $job->ignoreDuplicates() ) {
$sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
// Insert the de-duplicated jobs into the queues...
foreach ( $uJobsByPartition as $partition => $jobBatch ) {
+ /** @var JobQueue $queue */
$queue = $this->partitionQueues[$partition];
try {
$ok = $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
if ( $partition === false ) {
break; // all partitions at 0 weight
}
+
+ /** @var JobQueue $queue */
$queue = $this->partitionQueues[$partition];
try {
$job = $queue->pop();
}
protected function doDelete() {
+ /** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
try {
$queue->doDelete();
}
protected function doWaitForBackups() {
+ /** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
try {
$queue->waitForBackups();
protected function doGetPeriodicTasks() {
$tasks = array();
+ /** @var JobQueue $queue */
foreach ( $this->partitionQueues as $partition => $queue ) {
foreach ( $queue->getPeriodicTasks() as $task => $def ) {
$tasks["{$partition}:{$task}"] = $def;
'delayedcount',
'abandonedcount'
);
+
foreach ( $types as $type ) {
$this->cache->delete( $this->getCacheKey( $type ) );
}
+
+ /** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
$queue->doFlushCaches();
}
public function getAllQueuedJobs() {
$iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
$iterator->append( $queue->getAllQueuedJobs() );
}
public function getAllDelayedJobs() {
$iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
$iterator->append( $queue->getAllDelayedJobs() );
}
protected function doGetSiblingQueuesWithJobs( array $types ) {
$result = array();
+
+ /** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
try {
$nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
protected function doGetSiblingQueueSizes( array $types ) {
$result = array();
+
+ /** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
try {
$sizes = $queue->doGetSiblingQueueSizes( $types );
}
public function setTestingPrefix( $key ) {
+ /** @var JobQueue $queue */
foreach ( $this->partitionQueues as $queue ) {
$queue->setTestingPrefix( $key );
}
}
/**
+ * @param $property
* @return string
*/
private function getCacheKey( $property ) {