Made Lua code in JobQueueRedis more readable with unpack()
authorAaron Schulz <aschulz@wikimedia.org>
Fri, 8 Nov 2013 18:16:25 +0000 (10:16 -0800)
committerTim Starling <tstarling@wikimedia.org>
Mon, 2 Dec 2013 03:01:51 +0000 (03:01 +0000)
Change-Id: I52fb765a153acc7f16baa804ab2342e9b5dd9fd6

includes/job/JobQueueRedis.php

index e8c475d..5c3c5e9 100644 (file)
@@ -251,23 +251,24 @@ class JobQueueRedis extends JobQueue {
                }
                static $script =
 <<<LUA
+               local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS)
                if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
                local pushed = 0
                for i = 1,#ARGV,4 do
                        local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
-                       if sha1 == '' or redis.call('hExists',KEYS[3],sha1) == 0 then
+                       if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
                                if 1*rtimestamp > 0 then
                                        -- Insert into delayed queue (release time as score)
-                                       redis.call('zAdd',KEYS[4],rtimestamp,id)
+                                       redis.call('zAdd',kDelayed,rtimestamp,id)
                                else
                                        -- Insert into unclaimed queue
-                                       redis.call('lPush',KEYS[1],id)
+                                       redis.call('lPush',kUnclaimed,id)
                                end
                                if sha1 ~= '' then
-                                       redis.call('hSet',KEYS[2],id,sha1)
-                                       redis.call('hSet',KEYS[3],sha1,id)
+                                       redis.call('hSet',kSha1ById,id,sha1)
+                                       redis.call('hSet',kIdBySha1,sha1,id)
                                end
-                               redis.call('hSet',KEYS[5],id,blob)
+                               redis.call('hSet',kData,id,blob)
                                pushed = pushed + 1
                        end
                end
@@ -343,16 +344,17 @@ LUA;
        protected function popAndDeleteBlob( RedisConnRef $conn ) {
                static $script =
 <<<LUA
+               local kUnclaimed, kSha1ById, kIdBySha1, kData = unpack(KEYS)
                -- Pop an item off the queue
-               local id = redis.call('rpop',KEYS[1])
+               local id = redis.call('rpop',kUnclaimed)
                if not id then return false end
                -- Get the job data and remove it
-               local item = redis.call('hGet',KEYS[4],id)
-               redis.call('hDel',KEYS[4],id)
+               local item = redis.call('hGet',kData,id)
+               redis.call('hDel',kData,id)
                -- Allow new duplicates of this job
-               local sha1 = redis.call('hGet',KEYS[2],id)
-               if sha1 then redis.call('hDel',KEYS[3],sha1) end
-               redis.call('hDel',KEYS[2],id)
+               local sha1 = redis.call('hGet',kSha1ById,id)
+               if sha1 then redis.call('hDel',kIdBySha1,sha1) end
+               redis.call('hDel',kSha1ById,id)
                -- Return the job data
                return item
 LUA;
@@ -375,17 +377,18 @@ LUA;
        protected function popAndAcquireBlob( RedisConnRef $conn ) {
                static $script =
 <<<LUA
+               local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
                -- Pop an item off the queue
-               local id = redis.call('rPop',KEYS[1])
+               local id = redis.call('rPop',kUnclaimed)
                if not id then return false end
                -- Allow new duplicates of this job
-               local sha1 = redis.call('hGet',KEYS[2],id)
-               if sha1 then redis.call('hDel',KEYS[3],sha1) end
-               redis.call('hDel',KEYS[2],id)
+               local sha1 = redis.call('hGet',kSha1ById,id)
+               if sha1 then redis.call('hDel',kIdBySha1,sha1) end
+               redis.call('hDel',kSha1ById,id)
                -- Mark the jobs as claimed and return it
-               redis.call('zAdd',KEYS[4],ARGV[1],id)
-               redis.call('hIncrBy',KEYS[5],id,1)
-               return redis.call('hGet',KEYS[6],id)
+               redis.call('zAdd',kClaimed,ARGV[1],id)
+               redis.call('hIncrBy',kAttempts,id,1)
+               return redis.call('hGet',kData,id)
 LUA;
                return $conn->luaEval( $script,
                        array(
@@ -416,11 +419,12 @@ LUA;
                        try {
                                static $script =
 <<<LUA
+                               local kClaimed, kAttempts, kData = unpack(KEYS)
                                -- Unmark the job as claimed
-                               redis.call('zRem',KEYS[1],ARGV[1])
-                               redis.call('hDel',KEYS[2],ARGV[1])
+                               redis.call('zRem',kClaimed,ARGV[1])
+                               redis.call('hDel',kAttempts,ARGV[1])
                                -- Delete the job data itself
-                               return redis.call('hDel',KEYS[3],ARGV[1])
+                               return redis.call('hDel',kData,ARGV[1])
 LUA;
                                $res = $conn->luaEval( $script,
                                        array(
@@ -636,12 +640,13 @@ LUA;
                try {
                        static $script =
 <<<LUA
+                       local kDelayed, kUnclaimed = unpack(KEYS)
                        -- Get the list of ready delayed jobs, sorted by readiness
-                       local ids = redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
+                       local ids = redis.call('zRangeByScore',kDelayed,0,ARGV[1])
                        -- Migrate the jobs from the "delayed" set to the "unclaimed" list
                        for k,id in ipairs(ids) do
-                               redis.call('lPush',KEYS[2],id)
-                               redis.call('zRem',KEYS[1],id)
+                               redis.call('lPush',kUnclaimed,id)
+                               redis.call('zRem',kDelayed,id)
                        end
                        return #ids
 LUA;
@@ -680,33 +685,34 @@ LUA;
                        $now = time();
                        static $script =
 <<<LUA
+                       local kClaimed, kAttempts, kUnclaimed, kData, kAbandoned = unpack(KEYS)
                        local released,abandoned,pruned = 0,0,0
                        -- Get all non-dead jobs that have an expired claim on them.
                        -- The score for each item is the last claim timestamp (UNIX).
-                       local staleClaims = redis.call('zRangeByScore',KEYS[1],0,ARGV[1])
+                       local staleClaims = redis.call('zRangeByScore',kClaimed,0,ARGV[1])
                        for k,id in ipairs(staleClaims) do
-                               local timestamp = redis.call('zScore',KEYS[1],id)
-                               local attempts = redis.call('hGet',KEYS[2],id)
+                               local timestamp = redis.call('zScore',kClaimed,id)
+                               local attempts = redis.call('hGet',kAttempts,id)
                                if attempts < ARGV[3] then
                                        -- Claim expired and retries left: re-enqueue the job
-                                       redis.call('lPush',KEYS[3],id)
-                                       redis.call('hIncrBy',KEYS[2],id,1)
+                                       redis.call('lPush',kUclaimed,id)
+                                       redis.call('hIncrBy',kAttempts,id,1)
                                        released = released + 1
                                else
                                        -- Claim expired and no retries left: mark the job as dead
-                                       redis.call('zAdd',KEYS[5],timestamp,id)
+                                       redis.call('zAdd',kAbandoned,timestamp,id)
                                        abandoned = abandoned + 1
                                end
-                               redis.call('zRem',KEYS[1],id)
+                               redis.call('zRem',kClaimed,id)
                        end
                        -- Get all of the dead jobs that have been marked as dead for too long.
                        -- The score for each item is the last claim timestamp (UNIX).
-                       local deadClaims = redis.call('zRangeByScore',KEYS[5],0,ARGV[2])
+                       local deadClaims = redis.call('zRangeByScore',kAbandoned,0,ARGV[2])
                        for k,id in ipairs(deadClaims) do
                                -- Stale and out of retries: remove any traces of the job
-                               redis.call('zRem',KEYS[5],id)
-                               redis.call('hDel',KEYS[2],id)
-                               redis.call('hDel',KEYS[4],id)
+                               redis.call('zRem',kAbandoned,id)
+                               redis.call('hDel',kAttempts,id)
+                               redis.call('hDel',kData,id)
                                pruned = pruned + 1
                        end
                        return {released,abandoned,pruned}