Merged redis queue periodic tasks into recyclePruneAndUndelayJobs()
authorAaron Schulz <aschulz@wikimedia.org>
Sun, 29 Dec 2013 21:17:35 +0000 (13:17 -0800)
committerOri.livneh <ori@wikimedia.org>
Wed, 5 Feb 2014 16:42:02 +0000 (16:42 +0000)
* This avoids a few extra round trips for queues with delayed jobs

Change-Id: I457512360a445c234cfeba7a716eedeb37273467

includes/job/JobQueueRedis.php

index 212871e..e0641b5 100644 (file)
@@ -300,7 +300,7 @@ LUA;
                // Push ready delayed jobs into the queue every 10 jobs to spread the load.
                // This is also done as a periodic task, but we don't want too much done at once.
                if ( $this->checkDelay && mt_rand( 0, 9 ) == 0 ) {
-                       $this->releaseReadyDelayedJobs();
+                       $this->recyclePruneAndUndelayJobs();
                }
 
                $conn = $this->getConnection();
@@ -309,7 +309,7 @@ LUA;
                                if ( $this->claimTTL > 0 ) {
                                        // Keep the claimed job list down for high-traffic queues
                                        if ( mt_rand( 0, 99 ) == 0 ) {
-                                               $this->recycleAndDeleteStaleJobs();
+                                               $this->recyclePruneAndUndelayJobs();
                                        }
                                        $blob = $this->popAndAcquireBlob( $conn );
                                } else {
@@ -326,7 +326,7 @@ LUA;
                                        continue;
                                }
 
-                               // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed
+                               // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed
                                $job = $this->getJobFromFields( $item ); // may be false
                        } while ( !$job ); // job may be false if invalid
                } catch ( RedisException $e ) {
@@ -627,54 +627,14 @@ LUA;
                }
        }
 
-       /**
-        * Release any ready delayed jobs into the queue
-        *
-        * @return int Number of jobs released
-        * @throws JobQueueError
-        */
-       public function releaseReadyDelayedJobs() {
-               $count = 0;
-
-               $conn = $this->getConnection();
-               try {
-                       static $script =
-<<<LUA
-                       local kDelayed, kUnclaimed = unpack(KEYS)
-                       -- Get the list of ready delayed jobs, sorted by readiness
-                       local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[1])
-                       -- Migrate the jobs from the "delayed" set to the "unclaimed" list
-                       for k,id in ipairs(ids) do
-                               redis.call('lPush',kUnclaimed,id)
-                               redis.call('zRem',kDelayed,id)
-                       end
-                       return #ids
-LUA;
-                       $count += (int)$conn->luaEval( $script,
-                               array(
-                                       $this->getQueueKey( 'z-delayed' ), // KEYS[1]
-                                       $this->getQueueKey( 'l-unclaimed' ), // KEYS[2]
-                                       time() // ARGV[1]; max "delay until" UNIX timestamp
-                               ),
-                               2 # first two arguments are keys
-                       );
-               } catch ( RedisException $e ) {
-                       $this->throwRedisException( $this->server, $conn, $e );
-               }
-
-               return $count;
-       }
-
        /**
         * Recycle or destroy any jobs that have been claimed for too long
+        * and release any ready delayed jobs into the queue
         *
-        * @return int Number of jobs recycled/deleted
+        * @return int Number of jobs recycled/deleted/undelayed
         * @throws MWException|JobQueueError
         */
-       public function recycleAndDeleteStaleJobs() {
-               if ( $this->claimTTL <= 0 ) { // sanity
-                       throw new MWException( "Cannot recycle jobs since acknowledgements are disabled." );
-               }
+       public function recyclePruneAndUndelayJobs() {
                $count = 0;
                // For each job item that can be retried, we need to add it back to the
                // main queue and remove it from the list of currenty claimed job items.
@@ -685,8 +645,8 @@ LUA;
                        $now = time();
                        static $script =
 <<<LUA
-                       local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned = unpack(KEYS)
-                       local released,abandoned,pruned = 0,0,0
+                       local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS)
+                       local released,abandoned,pruned,undelayed = 0,0,0,0
                        -- Get all non-dead jobs that have an expired claim on them.
                        -- The score for each item is the last claim timestamp (UNIX).
                        local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
@@ -715,7 +675,15 @@ LUA;
                                redis.call('hDel',kData,id)
                                pruned = pruned + 1
                        end
-                       return {released,abandoned,pruned}
+                       -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp)
+                       local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4])
+                       -- Migrate the jobs from the "delayed" set to the "unclaimed" list
+                       for k,id in ipairs(ids) do
+                               redis.call('lPush',kUnclaimed,id)
+                               redis.call('zRem',kDelayed,id)
+                       end
+                       undelayed = #ids
+                       return {released,abandoned,pruned,undelayed}
 LUA;
                        $res = $conn->luaEval( $script,
                                array(
@@ -724,15 +692,17 @@ LUA;
                                        $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
                                        $this->getQueueKey( 'h-data' ), # KEYS[4]
                                        $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
+                                       $this->getQueueKey( 'z-delayed' ), # KEYS[6]
                                        $now - $this->claimTTL, # ARGV[1]
                                        $now - self::MAX_AGE_PRUNE, # ARGV[2]
-                                       $this->maxTries # ARGV[3]
+                                       $this->maxTries, # ARGV[3]
+                                       $now # ARGV[4]
                                ),
-                               5 # number of first argument(s) that are keys
+                               6 # number of first argument(s) that are keys
                        );
                        if ( $res ) {
-                               list( $released, $abandoned, $pruned ) = $res;
-                               $count += $released + $pruned;
+                               list( $released, $abandoned, $pruned, $undelayed ) = $res;
+                               $count += $released + $pruned + $undelayed;
                                JobQueue::incrStats( 'job-recycle', $this->type, $released );
                                JobQueue::incrStats( 'job-abandon', $this->type, $abandoned );
                        }
@@ -747,21 +717,19 @@ LUA;
         * @return array
         */
        protected function doGetPeriodicTasks() {
-               $tasks = array();
+               $periods = array( 3600 ); // standard cleanup (useful on config change)
                if ( $this->claimTTL > 0 ) {
-                       $tasks['recycleAndDeleteStaleJobs'] = array(
-                               'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
-                               'period' => ceil( $this->claimTTL / 2 )
-                       );
+                       $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing
                }
                if ( $this->checkDelay ) {
-                       $tasks['releaseReadyDelayedJobs'] = array(
-                               'callback' => array( $this, 'releaseReadyDelayedJobs' ),
-                               'period' => 300 // 5 minutes
-                       );
+                       $periods[] = 300; // 5 minutes
                }
-
-               return $tasks;
+               return array(
+                       'recyclePruneAndUndelayJobs' => array(
+                               'callback' => array( $this, 'recyclePruneAndUndelayJobs' ),
+                               'period'   => max( min( $periods ), 30 ) // sanity
+                       )
+               );
        }
 
        /**