fix some spacing
[lhc/web/wiklou.git] / includes / job / JobQueue.php
index 6cdc948..b7bbfe6 100644 (file)
@@ -33,19 +33,19 @@ abstract class JobQueue {
        protected $type; // string; job type
        protected $order; // string; job priority for pop()
        protected $claimTTL; // integer; seconds
+       protected $maxTries; // integer; maximum number of times to try a job
 
        const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
 
-       const MAX_ATTEMPTS = 3; // integer; number of times to try a job
-
        /**
         * @param $params array
         */
        protected function __construct( array $params ) {
-               $this->wiki     = $params['wiki'];
-               $this->type     = $params['type'];
-               $this->order    = isset( $params['order'] ) ? $params['order'] : 'random';
+               $this->wiki = $params['wiki'];
+               $this->type = $params['type'];
+               $this->order = isset( $params['order'] ) ? $params['order'] : 'random';
                $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
+               $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
        }
 
        /**
@@ -62,6 +62,8 @@ abstract class JobQueue {
         *                by timestamp, allowing for some jobs to be popped off out of order.
         *                If "random" is used, pop() will pick jobs in random order. This might be
         *                useful for improving concurrency depending on the queue storage medium.
+        *                Note that "random" really means "don't care", so it may actually be FIFO
+        *                or only weakly random (e.g. pop() takes one of the first X jobs randomly).
         *   - claimTTL : If supported, the queue will recycle jobs that have been popped
         *                but not acknowledged as completed after this many seconds. Recycling
         *                of jobs simple means re-inserting them into the queue. Jobs can be
@@ -104,6 +106,7 @@ abstract class JobQueue {
         * Queue classes should use caching if they are any slower without memcached.
         *
         * @return bool
+        * @throws MWException
         */
        final public function isEmpty() {
                wfProfileIn( __METHOD__ );
@@ -123,6 +126,7 @@ abstract class JobQueue {
         * Queue classes should use caching if they are any slower without memcached.
         *
         * @return integer
+        * @throws MWException
         */
        final public function getSize() {
                wfProfileIn( __METHOD__ );
@@ -142,6 +146,7 @@ abstract class JobQueue {
         * Queue classes should use caching if they are any slower without memcached.
         *
         * @return integer
+        * @throws MWException
         */
        final public function getAcquiredCount() {
                wfProfileIn( __METHOD__ );
@@ -159,33 +164,37 @@ abstract class JobQueue {
        /**
         * Push a single jobs into the queue.
         * This does not require $wgJobClasses to be set for the given job type.
+        * Outside callers should use JobQueueGroup::push() instead of this function.
         *
         * @param $jobs Job|Array
         * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
-        * @throws MWException
         * @return bool Returns false on failure
+        * @throws MWException
         */
        final public function push( $jobs, $flags = 0 ) {
-               $jobs = is_array( $jobs ) ? $jobs : array( $jobs );
-
-               return $this->batchPush( $jobs, $flags );
+               return $this->batchPush( is_array( $jobs ) ? $jobs : array( $jobs ), $flags );
        }
 
        /**
         * Push a batch of jobs into the queue.
         * This does not require $wgJobClasses to be set for the given job type.
+        * Outside callers should use JobQueueGroup::push() instead of this function.
         *
         * @param $jobs array List of Jobs
         * @param $flags integer Bitfield (supports JobQueue::QoS_Atomic)
-        * @throws MWException
         * @return bool Returns false on failure
+        * @throws MWException
         */
        final public function batchPush( array $jobs, $flags = 0 ) {
+               if ( !count( $jobs ) ) {
+                       return true; // nothing to do
+               }
                foreach ( $jobs as $job ) {
                        if ( $job->getType() !== $this->type ) {
                                throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
                        }
                }
+
                wfProfileIn( __METHOD__ );
                $ok = $this->doBatchPush( $jobs, $flags );
                wfProfileOut( __METHOD__ );
@@ -201,10 +210,21 @@ abstract class JobQueue {
        /**
         * Pop a job off of the queue.
         * This requires $wgJobClasses to be set for the given job type.
+        * Outside callers should use JobQueueGroup::pop() instead of this function.
         *
-        * @return Job|bool Returns false on failure
+        * @return Job|bool Returns false if there are no jobs
+        * @throws MWException
         */
        final public function pop() {
+               global $wgJobClasses;
+
+               if ( $this->wiki !== wfWikiID() ) {
+                       throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
+               } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
+                       // Do not pop jobs if there is no class for the queue type
+                       throw new MWException( "Unrecognized job type '{$this->type}'." );
+               }
+
                wfProfileIn( __METHOD__ );
                $job = $this->doPop();
                wfProfileOut( __METHOD__ );
@@ -221,10 +241,11 @@ abstract class JobQueue {
         * Acknowledge that a job was completed.
         *
         * This does nothing for certain queue classes or if "claimTTL" is not set.
+        * Outside callers should use JobQueueGroup::ack() instead of this function.
         *
         * @param $job Job
-        * @throws MWException
         * @return bool
+        * @throws MWException
         */
        final public function ack( Job $job ) {
                if ( $job->getType() !== $this->type ) {
@@ -270,8 +291,8 @@ abstract class JobQueue {
         * This does nothing for certain queue classes.
         *
         * @param $job Job
-        * @throws MWException
         * @return bool
+        * @throws MWException
         */
        final public function deduplicateRootJob( Job $job ) {
                if ( $job->getType() !== $this->type ) {
@@ -298,6 +319,7 @@ abstract class JobQueue {
         * This does nothing for certain queue classes.
         *
         * @return void
+        * @throws MWException
         */
        final public function waitForBackups() {
                wfProfileIn( __METHOD__ );
@@ -310,4 +332,60 @@ abstract class JobQueue {
         * @return void
         */
        protected function doWaitForBackups() {}
+
+       /**
+        * Return a map of task names to task definition maps.
+        * A "task" is a fast periodic queue maintenance action.
+        * Mutually exclusive tasks must implement their own locking in the callback.
+        *
+        * Each task value is an associative array with:
+        *   - name     : the name of the task
+        *   - callback : a PHP callable that performs the task
+        *   - period   : the period in seconds corresponding to the task frequency
+        *
+        * @return Array
+        */
+       final public function getPeriodicTasks() {
+               $tasks = $this->doGetPeriodicTasks();
+               foreach ( $tasks as $name => &$def ) {
+                       $def['name'] = $name;
+               }
+               return $tasks;
+       }
+
+       /**
+        * @see JobQueue::getPeriodicTasks()
+        * @return Array
+        */
+       protected function doGetPeriodicTasks() {
+               return array();
+       }
+
+       /**
+        * Clear any process and persistent caches
+        *
+        * @return void
+        */
+       final public function flushCaches() {
+               wfProfileIn( __METHOD__ );
+               $this->doFlushCaches();
+               wfProfileOut( __METHOD__ );
+       }
+
+       /**
+        * @see JobQueue::flushCaches()
+        * @return void
+        */
+       protected function doFlushCaches() {}
+
+       /**
+        * Namespace the queue with a key to isolate it for testing
+        *
+        * @param $key string
+        * @return void
+        * @throws MWException
+        */
+       public function setTestingPrefix( $key ) {
+               throw new MWException( "Queue namespacing not supported for this queue type." );
+       }
 }