Merge "New hook for filters on Special:Contributions form"
[lhc/web/wiklou.git] / includes / jobqueue / JobQueueRedis.php
index deb5aa5..eda3e9c 100644 (file)
@@ -26,8 +26,9 @@
  *
  * This is a faster and less resource-intensive job queue than JobQueueDB.
  * All data for a queue using this class is placed into one redis server.
+ * The mediawiki/services/jobrunner background service must be set up and running.
  *
- * There are eight main redis keys used to track jobs:
+ * There are eight main redis keys (per queue) used to track jobs:
  *   - l-unclaimed  : A list of job IDs used for ready unclaimed jobs
  *   - z-claimed    : A sorted set of (job ID, UNIX timestamp as score) used for job retries
  *   - z-abandoned  : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
  * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its
  * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
  *
+ * The following keys are used to track queue states:
+ *   - s-queuesWithJobs : A set of all queues with non-abandoned jobs
+ *
+ * The background service takes care of undelaying, recycling, and pruning jobs as well as
+ * removing s-queuesWithJobs entries as queues empty.
+ *
  * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
  * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
  * All the keys are prefixed with the relevant wiki ID information.
@@ -179,7 +186,7 @@ class JobQueueRedis extends JobQueue {
 
        /**
         * @see JobQueue::doBatchPush()
-        * @param array $jobs
+        * @param IJobSpecification[] $jobs
         * @param int $flags
         * @return void
         * @throws JobQueueError
@@ -239,7 +246,8 @@ class JobQueueRedis extends JobQueue {
         * @throws RedisException
         */
        protected function pushBlobs( RedisConnRef $conn, array $items ) {
-               $args = array(); // ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
+               $args = array( $this->encodeQueueName() );
+               // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
                foreach ( $items as $item ) {
                        $args[] = (string)$item['uuid'];
                        $args[] = (string)$item['sha1'];
@@ -248,10 +256,17 @@ class JobQueueRedis extends JobQueue {
                }
                static $script =
 <<<LUA
-               local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData = unpack(KEYS)
-               if #ARGV % 4 ~= 0 then return redis.error_reply('Unmatched arguments') end
+               local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
+               -- First argument is the queue ID
+               local queueId = ARGV[1]
+               -- Next arguments all come in 4s (one per job)
+               local variadicArgCount = #ARGV - 1
+               if variadicArgCount % 4 ~= 0 then
+                       return redis.error_reply('Unmatched arguments')
+               end
+               -- Insert each job into this queue as needed
                local pushed = 0
-               for i = 1,#ARGV,4 do
+               for i = 2,#ARGV,4 do
                        local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
                        if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
                                if 1*rtimestamp > 0 then
@@ -269,6 +284,8 @@ class JobQueueRedis extends JobQueue {
                                pushed = pushed + 1
                        end
                end
+               -- Mark this queue as having jobs
+               redis.call('sAdd',kQwJobs,queueId)
                return pushed
 LUA;
                return $conn->luaEval( $script,
@@ -279,10 +296,11 @@ LUA;
                                        $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
                                        $this->getQueueKey( 'z-delayed' ), # KEYS[4]
                                        $this->getQueueKey( 'h-data' ), # KEYS[5]
+                                       $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6]
                                ),
                                $args
                        ),
-                       5 # number of first argument(s) that are keys
+                       6 # number of first argument(s) that are keys
                );
        }
 
@@ -328,15 +346,18 @@ LUA;
                static $script =
 <<<LUA
                local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
+               local rTime = unpack(ARGV)
                -- Pop an item off the queue
                local id = redis.call('rPop',kUnclaimed)
-               if not id then return false end
+               if not id then
+                       return false
+               end
                -- Allow new duplicates of this job
                local sha1 = redis.call('hGet',kSha1ById,id)
                if sha1 then redis.call('hDel',kIdBySha1,sha1) end
                redis.call('hDel',kSha1ById,id)
                -- Mark the jobs as claimed and return it
-               redis.call('zAdd',kClaimed,ARGV[1],id)
+               redis.call('zAdd',kClaimed,rTime,id)
                redis.call('hIncrBy',kAttempts,id,1)
                return redis.call('hGet',kData,id)
 LUA;
@@ -372,11 +393,12 @@ LUA;
                        static $script =
 <<<LUA
                        local kClaimed, kAttempts, kData = unpack(KEYS)
+                       local uuid = unpack(ARGV)
                        -- Unmark the job as claimed
-                       redis.call('zRem',kClaimed,ARGV[1])
-                       redis.call('hDel',kAttempts,ARGV[1])
+                       redis.call('zRem',kClaimed,uuid)
+                       redis.call('hDel',kAttempts,uuid)
                        -- Delete the job data itself
-                       return redis.call('hDel',kData,ARGV[1])
+                       return redis.call('hDel',kData,uuid)
 LUA;
                        $res = $conn->luaEval( $script,
                                array(
@@ -472,7 +494,10 @@ LUA;
                                $keys[] = $this->getQueueKey( $prop );
                        }
 
-                       return ( $conn->delete( $keys ) !== false );
+                       $ok = ( $conn->delete( $keys ) !== false );
+                       $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() );
+
+                       return $ok;
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $conn, $e );
                }
@@ -623,6 +648,27 @@ LUA;
                }
        }
 
+       /**
+        * @return array List of (wiki,type) tuples for queues with non-abandoned jobs
+        * @throws JobQueueConnectionError
+        * @throws JobQueueError
+        */
+       public function getServerQueuesWithJobs() {
+               $queues = array();
+
+               $conn = $this->getConnection();
+               try {
+                       $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) );
+                       foreach ( $set as $queue ) {
+                               $queues[] = $this->decodeQueueName( $queue );
+                       }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $conn, $e );
+               }
+
+               return $queues;
+       }
+
        /**
         * @param IJobSpecification $job
         * @return array
@@ -639,7 +685,7 @@ LUA;
                        // Additional job metadata
                        'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
                        'sha1' => $job->ignoreDuplicates()
-                               ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
+                               ? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
                                : '',
                        'timestamp' => time() // UNIX timestamp
                );
@@ -703,7 +749,8 @@ LUA;
        protected function getConnection() {
                $conn = $this->redisPool->getConnection( $this->server );
                if ( !$conn ) {
-                       throw new JobQueueConnectionError( "Unable to connect to redis server." );
+                       throw new JobQueueConnectionError(
+                               "Unable to connect to redis server {$this->server}." );
                }
 
                return $conn;
@@ -720,25 +767,48 @@ LUA;
        }
 
        /**
-        * @param string $prop
-        * @param string|null $type
+        * @return string JSON
+        */
+       private function encodeQueueName() {
+               return json_encode( array( $this->type, $this->wiki ) );
+       }
+
+       /**
+        * @param string $name JSON
+        * @return array (type, wiki)
+        */
+       private function decodeQueueName( $name ) {
+               return json_decode( $name );
+       }
+
+       /**
+        * @param string $name
         * @return string
         */
-       private function getQueueKey( $prop, $type = null ) {
-               $type = is_string( $type ) ? $type : $this->type;
-               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
-               if ( strlen( $this->key ) ) { // namespaced queue (for testing)
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
-               } else {
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
+       private function getGlobalKey( $name ) {
+               $parts = array( 'global', 'jobqueue', $name );
+               foreach ( $parts as $part ) {
+                   if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
+                       throw new InvalidArgumentException( "Key part characters are out of range." );
+                   }
                }
+
+               return implode( ':', $parts );
        }
 
        /**
-        * @param string $key
-        * @return void
+        * @param string $prop
+        * @param string|null $type Override this for sibling queues
+        * @return string
         */
-       public function setTestingPrefix( $key ) {
-               $this->key = $key;
+       private function getQueueKey( $prop, $type = null ) {
+               $type = is_string( $type ) ? $type : $this->type;
+               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+               $keyspace = $prefix ? "$db-$prefix" : $db;
+
+               $parts = array( $keyspace, 'jobqueue', $type, $prop );
+
+               // Parts are typically ASCII, but encode for sanity to escape ":"
+               return implode( ':', array_map( 'rawurlencode', $parts ) );
        }
 }