Merge "Delete maintenance/language/zhtable/trad2simp_supp_unset.manual"
[lhc/web/wiklou.git] / includes / job / JobQueueRedis.php
index 57189a5..67bb5a4 100644 (file)
@@ -70,7 +70,7 @@ class JobQueueRedis extends JobQueue {
        /**
         * @params include:
         *   - redisConfig : An array of parameters to RedisConnectionPool::__construct().
-        *                   Note that the serializer option is ignored "none" is always used.
+        *                   Note that the serializer option is ignored as "none" is always used.
         *   - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
         *                   If a hostname is specified but no port, the standard port number
         *                   6379 will be used. Required.
@@ -501,7 +501,7 @@ LUA;
                        foreach ( $props as $prop ) {
                                $keys[] = $this->getQueueKey( $prop );
                        }
-                       $res = ( $conn->delete( $keys ) !== false );
+                       return ( $conn->delete( $keys ) !== false );
                } catch ( RedisException $e ) {
                        $this->throwRedisException( $this->server, $conn, $e );
                }
@@ -547,6 +547,35 @@ LUA;
                }
        }
 
+       public function getCoalesceLocationInternal() {
+               return "RedisServer:" . $this->server;
+       }
+
+       protected function doGetSiblingQueuesWithJobs( array $types ) {
+               return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
+       }
+
+       protected function doGetSiblingQueueSizes( array $types ) {
+               $sizes = array(); // (type => size)
+               $types = array_values( $types ); // reindex
+               try {
+                       $conn = $this->getConnection();
+                       $conn->multi( Redis::PIPELINE );
+                       foreach ( $types as $type ) {
+                               $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
+                       }
+                       $res = $conn->exec();
+                       if ( is_array( $res ) ) {
+                               foreach ( $res as $i => $size ) {
+                                       $sizes[$types[$i]] = $size;
+                               }
+                       }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+               return $sizes;
+       }
+
        /**
         * This function should not be called outside JobQueueRedis
         *
@@ -804,14 +833,16 @@ LUA;
 
        /**
         * @param $prop string
+        * @param $type string|null
         * @return string
         */
-       private function getQueueKey( $prop ) {
+       private function getQueueKey( $prop, $type = null ) {
+               $type = is_string( $type ) ? $type : $this->type;
                list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
                if ( strlen( $this->key ) ) { // namespaced queue (for testing)
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $this->key, $prop );
+                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $this->key, $prop );
                } else {
-                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
+                       return wfForeignMemcKey( $db, $prefix, 'jobqueue', $type, $prop );
                }
        }