protected $type; // string; job type
protected $order; // string; job priority for pop()
protected $claimTTL; // integer; seconds
+ protected $maxTries; // integer; maximum number of times to try a job
const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
- const MAX_ATTEMPTS = 3; // integer; number of times to try a job
-
/**
* @param $params array
*/
protected function __construct( array $params ) {
- $this->wiki = $params['wiki'];
- $this->type = $params['type'];
- $this->order = isset( $params['order'] ) ? $params['order'] : 'random';
+ $this->wiki = $params['wiki'];
+ $this->type = $params['type'];
+ $this->order = isset( $params['order'] ) ? $params['order'] : 'random';
$this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
+ $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
}
/**
* by timestamp, allowing for some jobs to be popped off out of order.
* If "random" is used, pop() will pick jobs in random order. This might be
* useful for improving concurrency depending on the queue storage medium.
+ * Note that "random" really means "don't care", so it may actually be FIFO
+ * or only weakly random (e.g. pop() takes one of the first X jobs randomly).
* - claimTTL : If supported, the queue will recycle jobs that have been popped
* but not acknowledged as completed after this many seconds. Recycling
* of jobs simple means re-inserting them into the queue. Jobs can be
* Queue classes should use caching if they are any slower without memcached.
*
* @return bool
+ * @throws MWException
*/
final public function isEmpty() {
wfProfileIn( __METHOD__ );
* Queue classes should use caching if they are any slower without memcached.
*
* @return integer
+ * @throws MWException
*/
final public function getSize() {
wfProfileIn( __METHOD__ );
* Queue classes should use caching if they are any slower without memcached.
*
* @return integer
+ * @throws MWException
*/
final public function getAcquiredCount() {
wfProfileIn( __METHOD__ );
/**
* Push a single jobs into the queue.
* This does not require $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::push() instead of this function.
*
* @param $jobs Job|Array
* @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
- * @throws MWException
* @return bool Returns false on failure
+ * @throws MWException
*/
final public function push( $jobs, $flags = 0 ) {
- $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
-
- return $this->batchPush( $jobs, $flags );
+ return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
}
/**
* Push a batch of jobs into the queue.
* This does not require $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::push() instead of this function.
*
* @param $jobs array List of Jobs
* @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
- * @throws MWException
* @return bool Returns false on failure
+ * @throws MWException
*/
final public function batchPush( array $jobs, $flags = 0 ) {
+ if ( !count( $jobs ) ) {
+ return true; // nothing to do
+ }
foreach ( $jobs as $job ) {
if ( $job->getType() !== $this->type ) {
throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
}
}
+
wfProfileIn( __METHOD__ );
$ok = $this->doBatchPush( $jobs, $flags );
wfProfileOut( __METHOD__ );
/**
* Pop a job off of the queue.
* This requires $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::pop() instead of this function.
*
- * @return Job|bool Returns false on failure
+ * @return Job|bool Returns false if there are no jobs
+ * @throws MWException
*/
final public function pop() {
+ global $wgJobClasses;
+
+ if ( $this->wiki !== wfWikiID() ) {
+ throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
+ } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
+ // Do not pop jobs if there is no class for the queue type
+ throw new MWException( "Unrecognized job type '{$this->type}'." );
+ }
+
wfProfileIn( __METHOD__ );
$job = $this->doPop();
wfProfileOut( __METHOD__ );
* Acknowledge that a job was completed.
*
* This does nothing for certain queue classes or if "claimTTL" is not set.
+ * Outside callers should use JobQueueGroup::ack() instead of this function.
*
* @param $job Job
- * @throws MWException
* @return bool
+ * @throws MWException
*/
final public function ack( Job $job ) {
if ( $job->getType() !== $this->type ) {
* This does nothing for certain queue classes.
*
* @param $job Job
- * @throws MWException
* @return bool
+ * @throws MWException
*/
final public function deduplicateRootJob( Job $job ) {
if ( $job->getType() !== $this->type ) {
* This does nothing for certain queue classes.
*
* @return void
+ * @throws MWException
*/
final public function waitForBackups() {
wfProfileIn( __METHOD__ );
* @return void
*/
protected function doWaitForBackups() {}
+
+ /**
+ * Return a map of task names to task definition maps.
+ * A "task" is a fast periodic queue maintenance action.
+ * Mutually exclusive tasks must implement their own locking in the callback.
+ *
+ * Each task value is an associative array with:
+ * - name : the name of the task
+ * - callback : a PHP callable that performs the task
+ * - period : the period in seconds corresponding to the task frequency
+ *
+ * @return Array
+ */
+ final public function getPeriodicTasks() {
+ $tasks = $this->doGetPeriodicTasks();
+ foreach ( $tasks as $name => &$def ) {
+ $def['name'] = $name;
+ }
+ return $tasks;
+ }
+
+ /**
+ * @see JobQueue::getPeriodicTasks()
+ * @return Array
+ */
+ protected function doGetPeriodicTasks() {
+ return array();
+ }
+
+ /**
+ * Clear any process and persistent caches
+ *
+ * @return void
+ */
+ final public function flushCaches() {
+ wfProfileIn( __METHOD__ );
+ $this->doFlushCaches();
+ wfProfileOut( __METHOD__ );
+ }
+
+ /**
+ * @see JobQueue::flushCaches()
+ * @return void
+ */
+ protected function doFlushCaches() {}
+
+ /**
+ * Namespace the queue with a key to isolate it for testing
+ *
+ * @param $key string
+ * @return void
+ * @throws MWException
+ */
+ public function setTestingPrefix( $key ) {
+ throw new MWException( "Queue namespacing not supported for this queue type." );
+ }
}