Only support "daemonized" mode for redis job queues to avoid code duplication
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 23 Jan 2015 21:00:49 +0000 (13:00 -0800)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 10 Feb 2015 20:18:13 +0000 (12:18 -0800)
Change-Id: I83bf61734fc48feea8a2f93adfe70f7d3a066b9e

RELEASE-NOTES-1.25
includes/jobqueue/JobQueueRedis.php

index 3b7eef3..3f24db0 100644 (file)
@@ -42,6 +42,9 @@ production.
   background with white fallback color, rather than just white background.
  * MediaWikiBagOStuff class removed, make sure any object cache config
    uses SqlBagOStuff instead.
+* The 'daemonized' flag must be set to true in $wgJobTypeConf for any redis
+  job queues. This means that mediawiki/services/jobrunner service has to
+  be installed and running for any such queues to work.
 
 === New features in 1.25 ===
 * (T64861) Updated plural rules to CLDR 26. Includes incompatible changes
index 9368fbf..243fec9 100644 (file)
@@ -64,8 +64,6 @@ class JobQueueRedis extends JobQueue {
        protected $server;
        /** @var string Compression method to use */
        protected $compression;
-       /** @var bool */
-       protected $daemonized;
 
        const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
 
@@ -90,7 +88,11 @@ class JobQueueRedis extends JobQueue {
                $this->server = $params['redisServer'];
                $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
                $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
-               $this->daemonized = !empty( $params['daemonized'] );
+               if ( empty( $params['daemonized'] ) ) {
+                       throw new Exception(
+                               "Non-daemonized mode is no longer supported. Please install the " .
+                               "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
+               }
                $this->checkDelay = true; // always enabled
        }
 
@@ -291,19 +293,9 @@ LUA;
        protected function doPop() {
                $job = false;
 
-               // Push ready delayed jobs into the queue every 10 jobs to spread the load.
-               // This is also done as a periodic task, but we don't want too much done at once.
-               if ( !$this->daemonized && mt_rand( 0, 9 ) == 0 ) {
-                       $this->recyclePruneAndUndelayJobs();
-               }
-
                $conn = $this->getConnection();
                try {
                        do {
-                               // Keep the claimed job list down for high-traffic queues
-                               if ( !$this->daemonized && mt_rand( 0, 99 ) == 0 ) {
-                                       $this->recyclePruneAndUndelayJobs();
-                               }
                                $blob = $this->popAndAcquireBlob( $conn );
                                if ( !is_string( $blob ) ) {
                                        break; // no jobs; nothing to do
@@ -316,7 +308,7 @@ LUA;
                                        continue;
                                }
 
-                               // If $item is invalid, recyclePruneAndUndelayJobs() will cleanup as needed
+                               // If $item is invalid, the runner loop recyling will cleanup as needed
                                $job = $this->getJobFromFields( $item ); // may be false
                        } while ( !$job ); // job may be false if invalid
                } catch ( RedisException $e ) {
@@ -583,112 +575,11 @@ LUA;
                }
        }
 
-       /**
-        * Recycle or destroy any jobs that have been claimed for too long
-        * and release any ready delayed jobs into the queue
-        *
-        * @return int Number of jobs recycled/deleted/undelayed
-        * @throws MWException|JobQueueError
-        */
-       public function recyclePruneAndUndelayJobs() {
-               $count = 0;
-               // For each job item that can be retried, we need to add it back to the
-               // main queue and remove it from the list of currenty claimed job items.
-               // For those that cannot, they are marked as dead and kept around for
-               // investigation and manual job restoration but are eventually deleted.
-               $conn = $this->getConnection();
-               try {
-                       $now = time();
-                       static $script =
-<<<LUA
-                       local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned, kDelayed = unpack(KEYS)
-                       local released,abandoned,pruned,undelayed = 0,0,0,0
-                       -- Get all non-dead jobs that have an expired claim on them.
-                       -- The score for each item is the last claim timestamp (UNIX).
-                       local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
-                       for k,id in ipairs(staleClaims) do
-                               local timestamp = redis.call('zScore',kClaimed,id)
-                               local attempts = redis.call('hGet',kAttempts,id)
-                               if attempts < ARGV[3] then
-                                       -- Claim expired and retries left: re-enqueue the job
-                                       redis.call('lPush',kUnclaimed,id)
-                                       released = released + 1
-                               else
-                                       -- Claim expired and no retries left: mark the job as dead
-                                       redis.call('zAdd',kAbandoned,timestamp,id)
-                                       abandoned = abandoned + 1
-                               end
-                               redis.call('zRem',kClaimed,id)
-                       end
-                       -- Get all of the dead jobs that have been marked as dead for too long.
-                       -- The score for each item is the last claim timestamp (UNIX).
-                       local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2])
-                       for k,id in ipairs(deadClaims) do
-                               -- Stale and out of retries: remove any traces of the job
-                               redis.call('zRem',kAbandoned,id)
-                               redis.call('hDel',kAttempts,id)
-                               redis.call('hDel',kData,id)
-                               pruned = pruned + 1
-                       end
-                       -- Get the list of ready delayed jobs, sorted by readiness (UNIX timestamp)
-                       local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[4])
-                       -- Migrate the jobs from the "delayed" set to the "unclaimed" list
-                       for k,id in ipairs(ids) do
-                               redis.call('lPush',kUnclaimed,id)
-                               redis.call('zRem',kDelayed,id)
-                       end
-                       undelayed = #ids
-                       return {released,abandoned,pruned,undelayed}
-LUA;
-                       $res = $conn->luaEval( $script,
-                               array(
-                                       $this->getQueueKey( 'z-claimed' ), # KEYS[1]
-                                       $this->getQueueKey( 'h-attempts' ), # KEYS[2]
-                                       $this->getQueueKey( 'l-unclaimed' ), # KEYS[3]
-                                       $this->getQueueKey( 'h-data' ), # KEYS[4]
-                                       $this->getQueueKey( 'z-abandoned' ), # KEYS[5]
-                                       $this->getQueueKey( 'z-delayed' ), # KEYS[6]
-                                       $now - $this->claimTTL, # ARGV[1]
-                                       $now - self::MAX_AGE_PRUNE, # ARGV[2]
-                                       $this->maxTries, # ARGV[3]
-                                       $now # ARGV[4]
-                               ),
-                               6 # number of first argument(s) that are keys
-                       );
-                       if ( $res ) {
-                               list( $released, $abandoned, $pruned, $undelayed ) = $res;
-                               $count += $released + $pruned + $undelayed;
-                               JobQueue::incrStats( 'job-recycle', $this->type, $released, $this->wiki );
-                               JobQueue::incrStats( 'job-abandon', $this->type, $abandoned, $this->wiki );
-                               JobQueue::incrStats( 'job-undelay', $this->type, $undelayed, $this->wiki );
-                       }
-               } catch ( RedisException $e ) {
-                       $this->throwRedisException( $conn, $e );
-               }
-
-               return $count;
-       }
-
        /**
         * @return array
         */
        protected function doGetPeriodicTasks() {
-               if ( $this->daemonized ) {
-                       return array(); // managed in the runner loop
-               }
-               $periods = array( 300 ); // 5 min; delayed/stale jobs
-               if ( $this->claimTTL > 0 ) {
-                       $periods[] = ceil( $this->claimTTL / 2 ); // halved to avoid bad timing
-               }
-               $period = min( $periods );
-               $period = max( $period, 30 ); // sanity
-
-               return array(
-                       'recyclePruneAndUndelayJobs' => array(
-                               'callback' => array( $this, 'recyclePruneAndUndelayJobs' ),
-                               'period'   => $period,
-                       )
-               );
+               return array(); // managed in the runner loop
        }
 
        /**