Doc: Reformat @params declaration
[lhc/web/wiklou.git] / includes / jobqueue / aggregator / JobQueueAggregatorRedis.php
index 2aec3c9..db9e764 100644 (file)
@@ -36,20 +36,20 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
        protected $servers;
 
        /**
-        * @params include:
+        * @param array $params Possible keys:
         *   - redisConfig  : An array of parameters to RedisConnectionPool::__construct().
         *   - redisServers : Array of server entries, the first being the primary and the
         *                    others being fallback servers. Each entry is either a hostname/port
         *                    combination or the absolute path of a UNIX socket.
         *                    If a hostname is specified but no port, the standard port number
         *                    6379 will be used. Required.
-        * @param array $params
         */
        protected function __construct( array $params ) {
                parent::__construct( $params );
                $this->servers = isset( $params['redisServers'] )
                        ? $params['redisServers']
                        : array( $params['redisServer'] ); // b/c
+               $params['redisConfig']['serializer'] = 'none';
                $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
        }
 
@@ -75,7 +75,10 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
                        return false;
                }
                try {
+                       $conn->multi( Redis::PIPELINE );
+                       $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
                        $conn->hSet( $this->getReadyQueueKey(), $this->encQueueName( $type, $wiki ), time() );
+                       $conn->exec();
 
                        return true;
                } catch ( RedisException $e ) {
@@ -91,18 +94,16 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
                        return array();
                }
                try {
-                       $conn->multi( Redis::PIPELINE );
-                       $conn->exists( $this->getReadyQueueKey() );
-                       $conn->hGetAll( $this->getReadyQueueKey() );
-                       list( $exists, $map ) = $conn->exec();
+                       $map = $conn->hGetAll( $this->getReadyQueueKey() );
 
-                       if ( $exists ) { // cache hit
+                       if ( is_array( $map ) && isset( $map['_epoch'] ) ) {
+                               unset( $map['_epoch'] ); // ignore
                                $pendingDBs = array(); // (type => list of wikis)
                                foreach ( $map as $key => $time ) {
                                        list( $type, $wiki ) = $this->dencQueueName( $key );
                                        $pendingDBs[$type][] = $wiki;
                                }
-                       } else { // cache miss
+                       } else {
                                // Avoid duplicated effort
                                $rand = wfRandomString( 32 );
                                $conn->multi( Redis::MULTI );
@@ -115,16 +116,19 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
 
                                $pendingDBs = $this->findPendingWikiQueues(); // (type => list of wikis)
 
-                               $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock
-
+                               $conn->multi( Redis::PIPELINE );
                                $now = time();
-                               $map = array();
+                               $map = array( '_epoch' => time() ); // dummy key for empty Redis collections
                                foreach ( $pendingDBs as $type => $wikis ) {
+                                       $conn->hSetNx( $this->getQueueTypesKey(), $type, 'enabled' );
                                        foreach ( $wikis as $wiki ) {
                                                $map[$this->encQueueName( $type, $wiki )] = $now;
                                        }
                                }
                                $conn->hMSet( $this->getReadyQueueKey(), $map );
+                               $conn->exec();
+
+                               $conn->delete( $this->getReadyQueueKey() . ":lock" ); // unlock
                        }
 
                        return $pendingDBs;
@@ -142,6 +146,7 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
                }
                try {
                        $conn->delete( $this->getReadyQueueKey() );
+                       // leave key at getQueueTypesKey() alone
                } catch ( RedisException $e ) {
                        $this->handleException( $conn, $e );
 
@@ -182,7 +187,14 @@ class JobQueueAggregatorRedis extends JobQueueAggregator {
         * @return string
         */
        private function getReadyQueueKey() {
-               return "jobqueue:aggregator:h-ready-queues:v1"; // global
+               return "jobqueue:aggregator:h-ready-queues:v2"; // global
+       }
+
+       /**
+        * @return string
+        */
+       private function getQueueTypesKey() {
+               return "jobqueue:aggregator:h-queue-types:v2"; // global
        }
 
        /**