Update ternary notation
[lhc/web/wiklou.git] / includes / job / JobQueueRedis.php
index 378e175..9b9fe2d 100644 (file)
@@ -60,17 +60,21 @@ class JobQueueRedis extends JobQueue {
        /** @var RedisConnectionPool */
        protected $redisPool;
 
-       protected $server; // string; server address
-       protected $compression; // string; compression method to use
+       /** @var string Server address */
+       protected $server;
+
+       /** @var string Compression method to use */
+       protected $compression;
 
        const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
 
-       protected $key; // string; key to prefix the queue keys with (used for testing)
+       /** @var string Key to prefix the queue keys with (used for testing) */
+       protected $key;
 
        /**
         * @params include:
         *   - redisConfig : An array of parameters to RedisConnectionPool::__construct().
-        *                   Note that the serializer option is ignored "none" is always used.
+        *                   Note that the serializer option is ignored as "none" is always used.
         *   - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
         *                   If a hostname is specified but no port, the standard port number
         *                   6379 will be used. Required.
@@ -108,7 +112,7 @@ class JobQueueRedis extends JobQueue {
 
        /**
         * @see JobQueue::doGetSize()
-        * @return integer
+        * @return int
         * @throws MWException
         */
        protected function doGetSize() {
@@ -122,8 +126,8 @@ class JobQueueRedis extends JobQueue {
 
        /**
         * @see JobQueue::doGetAcquiredCount()
-        * @return integer
-        * @throws MWException
+        * @return int
+        * @throws JobQueueError
         */
        protected function doGetAcquiredCount() {
                if ( $this->claimTTL <= 0 ) {
@@ -134,6 +138,7 @@ class JobQueueRedis extends JobQueue {
                        $conn->multi( Redis::PIPELINE );
                        $conn->zSize( $this->getQueueKey( 'z-claimed' ) );
                        $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
+
                        return array_sum( $conn->exec() );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -142,8 +147,8 @@ class JobQueueRedis extends JobQueue {
 
        /**
         * @see JobQueue::doGetDelayedCount()
-        * @return integer
-        * @throws MWException
+        * @return int
+        * @throws JobQueueError
         */
        protected function doGetDelayedCount() {
                if ( !$this->checkDelay ) {
@@ -159,8 +164,8 @@ class JobQueueRedis extends JobQueue {
 
        /**
         * @see JobQueue::doGetAbandonedCount()
-        * @return integer
-        * @throws MWException
+        * @return int
+        * @throws JobQueueError
         */
        protected function doGetAbandonedCount() {
                if ( $this->claimTTL <= 0 ) {
@@ -179,7 +184,7 @@ class JobQueueRedis extends JobQueue {
         * @param array $jobs
         * @param $flags
         * @return bool
-        * @throws MWException
+        * @throws JobQueueError
         */
        protected function doBatchPush( array $jobs, $flags ) {
                // Convert the jobs into field maps (de-duplicated against each other)
@@ -217,6 +222,7 @@ class JobQueueRedis extends JobQueue {
                        }
                        if ( $failed > 0 ) {
                                wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." );
+
                                return false;
                        }
                        JobQueue::incrStats( 'job-insert', $this->type, count( $items ) );
@@ -232,7 +238,7 @@ class JobQueueRedis extends JobQueue {
        /**
         * @param RedisConnRef $conn
         * @param array $items List of results from JobQueueRedis::getNewJobFields()
-        * @return integer Number of jobs inserted (duplicates are ignored)
+        * @return int Number of jobs inserted (duplicates are ignored)
         * @throws RedisException
         */
        protected function pushBlobs( RedisConnRef $conn, array $items ) {
@@ -245,23 +251,24 @@ class JobQueueRedis extends JobQueue {
                }
                static $script =
 <<<LUA
+               local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS)
                if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
                local pushed = 0
                for i = 1,#ARGV,4 do
                        local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
-                       if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then
+                       if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
                                if 1*rtimestamp > 0 then
                                        -- Insert into delayed queue (release time as score)
-                                       redis.call('zAdd',KEYS[4],rtimestamp,id)
+                                       redis.call('zAdd',kDelayed,rtimestamp,id)
                                else
                                        -- Insert into unclaimed queue
-                                       redis.call('lPush',KEYS[1],id)
+                                       redis.call('lPush',kUnclaimed,id)
                                end
                                if sha1 ~= '' then
-                                       redis.call('hSet',KEYS[2],id,sha1)
-                                       redis.call('hSet',KEYS[3],sha1,id)
+                                       redis.call('hSet',kSha1ById,id,sha1)
+                                       redis.call('hSet',kIdBySha1,sha1,id)
                                end
-                               redis.call('hSet',KEYS[5],id,blob)
+                               redis.call('hSet',kData,id,blob)
                                pushed = pushed + 1
                        end
                end
@@ -285,7 +292,7 @@ LUA;
        /**
         * @see JobQueue::doPop()
         * @return Job|bool
-        * @throws MWException
+        * @throws JobQueueError
         */
        protected function doPop() {
                $job = false;
@@ -337,16 +344,17 @@ LUA;
        protected function popAndDeleteBlob( RedisConnRef $conn ) {
                static $script =
 <<<LUA
+               local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS)
                -- Pop an item off the queue
-               local id = redis.call('rpop',KEYS[1])
+               local id = redis.call('rpop',kUnclaimed)
                if not id then return false end
                -- Get the job data and remove it
-               local item = redis.call('hGet',KEYS[4],id)
-               redis.call('hDel',KEYS[4],id)
+               local item = redis.call('hGet',kData,id)
+               redis.call('hDel',kData,id)
                -- Allow new duplicates of this job
-               local sha1 = redis.call('hGet',KEYS[2],id)
-               if sha1 then redis.call('hDel',KEYS[3],sha1) end
-               redis.call('hDel',KEYS[2],id)
+               local sha1 = redis.call('hGet',kSha1ById,id)
+               if sha1 then redis.call('hDel',kIdBySha1,sha1) end
+               redis.call('hDel',kSha1ById,id)
                -- Return the job data
                return item
 LUA;
@@ -369,17 +377,18 @@ LUA;
        protected function popAndAcquireBlob( RedisConnRef $conn ) {
                static $script =
 <<<LUA
+               local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
                -- Pop an item off the queue
-               local id = redis.call('rPop',KEYS[1])
+               local id = redis.call('rPop',kUnclaimed)
                if not id then return false end
                -- Allow new duplicates of this job
-               local sha1 = redis.call('hGet',KEYS[2],id)
-               if sha1 then redis.call('hDel',KEYS[3],sha1) end
-               redis.call('hDel',KEYS[2],id)
+               local sha1 = redis.call('hGet',kSha1ById,id)
+               if sha1 then redis.call('hDel',kIdBySha1,sha1) end
+               redis.call('hDel',kSha1ById,id)
                -- Mark the jobs as claimed and return it
-               redis.call('zAdd',KEYS[4],ARGV[1],id)
-               redis.call('hIncrBy',KEYS[5],id,1)
-               return redis.call('hGet',KEYS[6],id)
+               redis.call('zAdd',kClaimed,ARGV[1],id)
+               redis.call('hIncrBy',kAttempts,id,1)
+               return redis.call('hGet',kData,id)
 LUA;
                return $conn->luaEval( $script,
                        array(
@@ -399,7 +408,7 @@ LUA;
         * @see JobQueue::doAck()
         * @param Job $job
         * @return Job|bool
-        * @throws MWException
+        * @throws MWException|JobQueueError
         */
        protected function doAck( Job $job ) {
                if ( !isset( $job->metadata['uuid'] ) ) {
@@ -410,11 +419,12 @@ LUA;
                        try {
                                static $script =
 <<<LUA
+                               local kClaimed, kAttempts, kData = unpack(KEYS)
                                -- Unmark the job as claimed
-                               redis.call('zRem',KEYS[1],ARGV[1])
-                               redis.call('hDel',KEYS[2],ARGV[1])
+                               redis.call('zRem',kClaimed,ARGV[1])
+                               redis.call('hDel',kAttempts,ARGV[1])
                                -- Delete the job data itself
-                               return redis.call('hDel',KEYS[3],ARGV[1])
+                               return redis.call('hDel',kData,ARGV[1])
 LUA;
                                $res = $conn->luaEval( $script,
                                        array(
@@ -428,12 +438,14 @@ LUA;
 
                                if ( !$res ) {
                                        wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+
                                        return false;
                                }
                        } catch ( RedisException $e ) {
                                $this->throwRedisException( $this->server, $conn, $e );
                        }
                }
+
                return true;
        }
 
@@ -441,7 +453,7 @@ LUA;
         * @see JobQueue::doDeduplicateRootJob()
         * @param Job $job
         * @return bool
-        * @throws MWException
+        * @throws MWException|JobQueueError
         */
        protected function doDeduplicateRootJob( Job $job ) {
                if ( !$job->hasRootJobParams() ) {
@@ -457,6 +469,7 @@ LUA;
                        if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
                                return true; // a newer version of this root job was enqueued
                        }
+
                        // Update the timestamp of the last root job started at the location...
                        return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
                } catch ( RedisException $e ) {
@@ -468,6 +481,7 @@ LUA;
         * @see JobQueue::doIsRootJobOldDuplicate()
         * @param Job $job
         * @return bool
+        * @throws JobQueueError
         */
        protected function doIsRootJobOldDuplicate( Job $job ) {
                if ( !$job->hasRootJobParams() ) {
@@ -490,6 +504,7 @@ LUA;
        /**
         * @see JobQueue::doDelete()
         * @return bool
+        * @throws JobQueueError
         */
        protected function doDelete() {
                static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned',
@@ -501,6 +516,7 @@ LUA;
                        foreach ( $props as $prop ) {
                                $keys[] = $this->getQueueKey( $prop );
                        }
+
                        return ( $conn->delete( $keys ) !== false );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -515,12 +531,15 @@ LUA;
                $conn = $this->getConnection();
                try {
                        $that = $this;
+
                        return new MappedIterator(
                                $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ),
-                               function( $uid ) use ( $that, $conn ) {
+                               function ( $uid ) use ( $that, $conn ) {
                                        return $that->getJobFromUidInternal( $uid, $conn );
                                },
-                               array( 'accept' => function ( $job ) { return is_object( $job ); } )
+                               array( 'accept' => function ( $job ) {
+                                       return is_object( $job );
+                               } )
                        );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -535,12 +554,15 @@ LUA;
                $conn = $this->getConnection();
                try {
                        $that = $this;
+
                        return new MappedIterator( // delayed jobs
                                $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ),
-                               function( $uid ) use ( $that, $conn ) {
+                               function ( $uid ) use ( $that, $conn ) {
                                        return $that->getJobFromUidInternal( $uid, $conn );
                                },
-                               array( 'accept' => function ( $job ) { return is_object( $job ); } )
+                               array( 'accept' => function ( $job ) {
+                                       return is_object( $job );
+                               } )
                        );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -573,6 +595,7 @@ LUA;
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
+
                return $sizes;
        }
 
@@ -582,7 +605,7 @@ LUA;
         * @param $uid string
         * @param $conn RedisConnRef
         * @return Job|bool Returns false if the job does not exist
-        * @throws MWException
+        * @throws MWException|JobQueueError
         */
        public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
                try {
@@ -597,6 +620,7 @@ LUA;
                        $title = Title::makeTitle( $item['namespace'], $item['title'] );
                        $job = Job::factory( $item['type'], $title, $item['params'] );
                        $job->metadata['uuid'] = $item['uuid'];
+
                        return $job;
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
@@ -606,8 +630,8 @@ LUA;
        /**
         * Release any ready delayed jobs into the queue
         *
-        * @return integer Number of jobs released
-        * @throws MWException
+        * @return int Number of jobs released
+        * @throws JobQueueError
         */
        public function releaseReadyDelayedJobs() {
                $count = 0;
@@ -616,12 +640,13 @@ LUA;
                try {
                        static $script =
 <<<LUA
+                       local kDelayed, kUnclaimed = unpack(KEYS)
                        -- Get the list of ready delayed jobs, sorted by readiness
-                       local ids = redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
+                       local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[1])
                        -- Migrate the jobs from the "delayed" set to the "unclaimed" list
                        for k,id in ipairs(ids) do
-                               redis.call('lPush',KEYS[2],id)
-                               redis.call('zRem',KEYS[1],id)
+                               redis.call('lPush',kUnclaimed,id)
+                               redis.call('zRem',kDelayed,id)
                        end
                        return #ids
 LUA;
@@ -643,8 +668,8 @@ LUA;
        /**
         * Recycle or destroy any jobs that have been claimed for too long
         *
-        * @return integer Number of jobs recycled/deleted
-        * @throws MWException
+        * @return int Number of jobs recycled/deleted
+        * @throws MWException|JobQueueError
         */
        public function recycleAndDeleteStaleJobs() {
                if ( $this->claimTTL <= 0 ) { // sanity
@@ -660,33 +685,34 @@ LUA;
                        $now = time();
                        static $script =
 <<<LUA
+                       local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned = unpack(KEYS)
                        local released,abandoned,pruned = 0,0,0
                        -- Get all non-dead jobs that have an expired claim on them.
                        -- The score for each item is the last claim timestamp (UNIX).
-                       local staleClaims = redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
+                       local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
                        for k,id in ipairs(staleClaims) do
-                               local timestamp = redis.call('zScore',KEYS[1],id)
-                               local attempts = redis.call('hGet',KEYS[2],id)
+                               local timestamp = redis.call('zScore',kClaimed,id)
+                               local attempts = redis.call('hGet',kAttempts,id)
                                if attempts < ARGV[3] then
                                        -- Claim expired and retries left: re-enqueue the job
-                                       redis.call('lPush',KEYS[3],id)
-                                       redis.call('hIncrBy',KEYS[2],id,1)
+                                       redis.call('lPush',kUnclaimed,id)
+                                       redis.call('hIncrBy',kAttempts,id,1)
                                        released = released + 1
                                else
                                        -- Claim expired and no retries left: mark the job as dead
-                                       redis.call('zAdd',KEYS[5],timestamp,id)
+                                       redis.call('zAdd',kAbandoned,timestamp,id)
                                        abandoned = abandoned + 1
                                end
-                               redis.call('zRem',KEYS[1],id)
+                               redis.call('zRem',kClaimed,id)
                        end
                        -- Get all of the dead jobs that have been marked as dead for too long.
                        -- The score for each item is the last claim timestamp (UNIX).
-                       local deadClaims = redis.call('zRangeByScore',KEYS[5],0,ARGV[2])
+                       local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2])
                        for k,id in ipairs(deadClaims) do
                                -- Stale and out of retries: remove any traces of the job
-                               redis.call('zRem',KEYS[5],id)
-                               redis.call('hDel',KEYS[2],id)
-                               redis.call('hDel',KEYS[4],id)
+                               redis.call('zRem',kAbandoned,id)
+                               redis.call('hDel',kAttempts,id)
+                               redis.call('hDel',kData,id)
                                pruned = pruned + 1
                        end
                        return {released,abandoned,pruned}
@@ -718,7 +744,7 @@ LUA;
        }
 
        /**
-        * @return Array
+        * @return array
         */
        protected function doGetPeriodicTasks() {
                $tasks = array();
@@ -734,28 +760,29 @@ LUA;
                                'period' => 300 // 5 minutes
                        );
                }
+
                return $tasks;
        }
 
        /**
-        * @param $job Job
+        * @param Job $job
         * @return array
         */
        protected function getNewJobFields( Job $job ) {
                return array(
                        // Fields that describe the nature of the job
-                       'type'       => $job->getType(),
-                       'namespace'  => $job->getTitle()->getNamespace(),
-                       'title'      => $job->getTitle()->getDBkey(),
-                       'params'     => $job->getParams(),
+                       'type' => $job->getType(),
+                       'namespace' => $job->getTitle()->getNamespace(),
+                       'title' => $job->getTitle()->getDBkey(),
+                       'params' => $job->getParams(),
                        // Some jobs cannot run until a "release timestamp"
                        'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
                        // Additional job metadata
-                       'uuid'       => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
-                       'sha1'       => $job->ignoreDuplicates()
-                               ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
-                               : '',
-                       'timestamp'  => time() // UNIX timestamp
+                       'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
+                       'sha1' => $job->ignoreDuplicates()
+                                       ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
+                                       : '',
+                       'timestamp' => time() // UNIX timestamp
                );
        }
 
@@ -768,8 +795,10 @@ LUA;
                if ( $title ) {
                        $job = Job::factory( $fields['type'], $title, $fields['params'] );
                        $job->metadata['uuid'] = $fields['uuid'];
+
                        return $job;
                }
+
                return false;
        }
 
@@ -780,10 +809,12 @@ LUA;
        protected function serialize( array $fields ) {
                $blob = serialize( $fields );
                if ( $this->compression === 'gzip'
-                       && strlen( $blob ) >= 1024 && function_exists( 'gzdeflate' ) )
-               {
+                       && strlen( $blob ) >= 1024
+                       && function_exists( 'gzdeflate' )
+               ) {
                        $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' );
                        $blobz = serialize( $object );
+
                        return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
                } else {
                        return $blob;
@@ -803,20 +834,22 @@ LUA;
                                $fields = false;
                        }
                }
+
                return is_array( $fields ) ? $fields : false;
        }
 
        /**
         * Get a connection to the server that handles all sub-queues for this queue
         *
-        * @return Array (server name, Redis instance)
-        * @throws MWException
+        * @return RedisConnRef
+        * @throws JobQueueConnectionError
         */
        protected function getConnection() {
                $conn = $this->redisPool->getConnection( $this->server );
                if ( !$conn ) {
                        throw new JobQueueConnectionError( "Unable to connect to redis server." );
                }
+
                return $conn;
        }
 
@@ -824,7 +857,7 @@ LUA;
         * @param $server string
         * @param $conn RedisConnRef
         * @param $e RedisException
-        * @throws MWException
+        * @throws JobQueueError
         */
        protected function throwRedisException( $server, RedisConnRef $conn, $e ) {
                $this->redisPool->handleException( $server, $conn, $e );