Avoid needing config to be in sync between MW and the jobrunner
authorAaron Schulz <aschulz@wikimedia.org>
Thu, 9 Oct 2014 00:40:23 +0000 (17:40 -0700)
committerTim Starling <tstarling@wikimedia.org>
Fri, 17 Oct 2014 23:03:53 +0000 (23:03 +0000)
* This unifies the retry and no-retry code paths so that its easier to use
  the separate job runner service.
* All jobs will use the ack procedure. If no retries are enabled they will
  just get deleted. As a side-effect, abandoned jobs can be seen for a few
  days in the no-retry case, which used to not happen. This could actually
  be useful to know about anyway.

Change-Id: I185092e3696fb336b9edcf19280dcd9a561161d9

includes/jobqueue/JobQueueRedis.php

index 3519eac..3ac5cf4 100644 (file)
@@ -134,9 +134,6 @@ class JobQueueRedis extends JobQueue {
         * @throws JobQueueError
         */
        protected function doGetAcquiredCount() {
-               if ( $this->claimTTL <= 0 ) {
-                       return 0; // no acknowledgements
-               }
                $conn = $this->getConnection();
                try {
                        $conn->multi( Redis::PIPELINE );
@@ -172,9 +169,6 @@ class JobQueueRedis extends JobQueue {
         * @throws JobQueueError
         */
        protected function doGetAbandonedCount() {
-               if ( $this->claimTTL <= 0 ) {
-                       return 0; // no acknowledgements
-               }
                $conn = $this->getConnection();
                try {
                        return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
@@ -308,15 +302,11 @@ LUA;
                $conn = $this->getConnection();
                try {
                        do {
-                               if ( $this->claimTTL > 0 ) {
-                                       // Keep the claimed job list down for high-traffic queues
-                                       if ( mt_rand( 0, 99 ) == 0 ) {
-                                               $this->recyclePruneAndUndelayJobs();
-                                       }
-                                       $blob = $this->popAndAcquireBlob( $conn );
-                               } else {
-                                       $blob = $this->popAndDeleteBlob( $conn );
+                               // Keep the claimed job list down for high-traffic queues
+                               if ( !$this->daemonized && mt_rand( 0, 99 ) == 0 ) {
+                                       $this->recyclePruneAndUndelayJobs();
                                }
+                               $blob = $this->popAndAcquireBlob( $conn );
                                if ( !is_string( $blob ) ) {
                                        break; // no jobs; nothing to do
                                }
@@ -338,39 +328,6 @@ LUA;
                return $job;
        }
 
-       /**
-        * @param RedisConnRef $conn
-        * @return array Serialized string or false
-        * @throws RedisException
-        */
-       protected function popAndDeleteBlob( RedisConnRef $conn ) {
-               static $script =
-<<<LUA
-               local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS)
-               -- Pop an item off the queue
-               local id = redis.call('rpop',kUnclaimed)
-               if not id then return false end
-               -- Get the job data and remove it
-               local item = redis.call('hGet',kData,id)
-               redis.call('hDel',kData,id)
-               -- Allow new duplicates of this job
-               local sha1 = redis.call('hGet',kSha1ById,id)
-               if sha1 then redis.call('hDel',kIdBySha1,sha1) end
-               redis.call('hDel',kSha1ById,id)
-               -- Return the job data
-               return item
-LUA;
-               return $conn->luaEval( $script,
-                       array(
-                               $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
-                               $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
-                               $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
-                               $this->getQueueKey( 'h-data' ), # KEYS[4]
-                       ),
-                       4 # number of first argument(s) that are keys
-               );
-       }
-
        /**
         * @param RedisConnRef $conn
         * @return array Serialized string or false
@@ -416,36 +373,35 @@ LUA;
                if ( !isset( $job->metadata['uuid'] ) ) {
                        throw new MWException( "Job of type '{$job->getType()}' has no UUID." );
                }
-               if ( $this->claimTTL > 0 ) {
-                       $conn = $this->getConnection();
-                       try {
-                               static $script =
+
+               $conn = $this->getConnection();
+               try {
+                       static $script =
 <<<LUA
-                               local kClaimed, kAttempts, kData = unpack(KEYS)
-                               -- Unmark the job as claimed
-                               redis.call('zRem',kClaimed,ARGV[1])
-                               redis.call('hDel',kAttempts,ARGV[1])
-                               -- Delete the job data itself
-                               return redis.call('hDel',kData,ARGV[1])
+                       local kClaimed, kAttempts, kData = unpack(KEYS)
+                       -- Unmark the job as claimed
+                       redis.call('zRem',kClaimed,ARGV[1])
+                       redis.call('hDel',kAttempts,ARGV[1])
+                       -- Delete the job data itself
+                       return redis.call('hDel',kData,ARGV[1])
 LUA;
-                               $res = $conn->luaEval( $script,
-                                       array(
-                                               $this->getQueueKey( 'z-claimed' ), # KEYS[1]
-                                               $this->getQueueKey( 'h-attempts' ), # KEYS[2]
-                                               $this->getQueueKey( 'h-data' ), # KEYS[3]
-                                               $job->metadata['uuid'] # ARGV[1]
-                                       ),
-                                       3 # number of first argument(s) that are keys
-                               );
-
-                               if ( !$res ) {
-                                       wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
-
-                                       return false;
-                               }
-                       } catch ( RedisException $e ) {
-                               $this->throwRedisException( $conn, $e );
+                       $res = $conn->luaEval( $script,
+                               array(
+                                       $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+                                       $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+                                       $this->getQueueKey( 'h-data' ), # KEYS[3]
+                                       $job->metadata['uuid'] # ARGV[1]
+                               ),
+                               3 # number of first argument(s) that are keys
+                       );
+
+                       if ( !$res ) {
+                               wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+
+                               return false;
                        }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $conn, $e );
                }
 
                return true;
@@ -725,7 +681,7 @@ LUA;
                }
                $periods = array( 3600 ); // standard cleanup (useful on config change)
                if ( $this->claimTTL > 0 ) {
-                       $periods[] = ceil( $this->claimTTL / 2 ); // avoid bad timing
+                       $periods[] = ceil( $this->claimTTL / 2 ); // halved to avoid bad timing
                }
                if ( $this->checkDelay ) {
                        $periods[] = 300; // 5 minutes