[JobQueue] Added a JobQueueRedis class.
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 12 Dec 2012 00:04:03 +0000 (16:04 -0800)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 22 Jan 2013 19:09:58 +0000 (11:09 -0800)
* The Job class now includes an extra "metadata" field now for usage
  by the queue classes. This is useful for stores that lack IDs.
* A RedisConnectionPool class was added for reusability.

Change-Id: Ie72536240e27fe1caf11096cdc4c85dfa821d181

includes/AutoLoader.php
includes/clientpool/RedisConnectionPool.php [new file with mode: 0644]
includes/job/Job.php
includes/job/JobQueueRedis.php [new file with mode: 0644]
includes/job/jobs/DuplicateJob.php

index 5eb497d..f4552f4 100644 (file)
@@ -449,6 +449,10 @@ $wgAutoloadLocalClasses = array(
        'TitleDependency' => 'includes/cache/CacheDependency.php',
        'TitleListDependency' => 'includes/cache/CacheDependency.php',
 
+       # includes/clientpool
+       'RedisConnectionPool' => 'includes/clientpool/RedisConnectionPool.php',
+       'RedisConnRef' => 'includes/clientpool/RedisConnectionPool.php',
+
        # includes/context
        'ContextSource' => 'includes/context/ContextSource.php',
        'DerivativeContext' => 'includes/context/DerivativeContext.php',
@@ -653,6 +657,7 @@ $wgAutoloadLocalClasses = array(
        'JobQueue' => 'includes/job/JobQueue.php',
        'JobQueueDB' => 'includes/job/JobQueueDB.php',
        'JobQueueGroup' => 'includes/job/JobQueueGroup.php',
+       'JobQueueRedis' => 'includes/job/JobQueueRedis.php',
 
        # includes/job/jobs
        'DoubleRedirectJob' => 'includes/job/jobs/DoubleRedirectJob.php',
diff --git a/includes/clientpool/RedisConnectionPool.php b/includes/clientpool/RedisConnectionPool.php
new file mode 100644 (file)
index 0000000..95a0afa
--- /dev/null
@@ -0,0 +1,298 @@
+<?php
+/**
+ * PhpRedis client connection pooling manager.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @defgroup Redis Redis
+ * @author Aaron Schulz
+ */
+
+/**
+ * Helper class to manage redis connections using PhpRedis.
+ *
+ * This can be used to get handle wrappers that free the handle when the wrapper
+ * leaves scope. The maximum number of free handles (connections) is configurable.
+ * This provides an easy way to cache connection handles that may also have state,
+ * such as handle does between multi() and exec(), and without hoarding connections.
+ * The wrappers use PHP magic methods so that calling functions on them calls the
+ * function of the actual Redis object handle.
+ *
+ * @ingroup Redis
+ * @since 1.21
+ */
+class RedisConnectionPool {
+       // Settings for all connections in this pool
+       protected $connectTimeout; // string; connection timeout
+       protected $persistent; // bool; whether connections persist
+       protected $password; // string; plaintext auth password
+       protected $poolSize; // integer; maximum number of idle connections
+       protected $serializer; // integer; the serializer to use (Redis::SERIALIZER_*)
+
+       protected $idlePoolSize = 0; // integer; current idle pool size
+
+       /** @var Array (server name => ((connection info array),...) */
+       protected $connections = array();
+       /** @var Array (server name => UNIX timestamp) */
+       protected $downServers = array();
+
+       /** @var Array */
+       protected static $instances = array(); // (pool ID => RedisConnectionPool)
+
+       const SERVER_DOWN_TTL = 30; // integer; seconds to cache servers as "down"
+
+       /**
+        * $options include:
+        *   - connectTimeout : The timeout for new connections, in seconds.
+        *                      Optional, default is 1 second.
+        *   - persistent     : Set this to true to allow connections to persist across
+        *                      multiple web requests. False by default.
+        *   - poolSize       : Maximim number of idle connections. Default is 5.
+        *   - password       : The authentication password, will be sent to Redis in clear text.
+        *                      Optional, if it is unspecified, no AUTH command will be sent.
+        *   - serializer     : Set to "php" or "igbinary". Default is "php".
+        * @param array $options
+        */
+       protected function __construct( array $options ) {
+               if ( !extension_loaded( 'redis' ) ) {
+                       throw new MWException( __CLASS__. ' requires the phpredis extension: ' .
+                               'https://github.com/nicolasff/phpredis' );
+               }
+               $this->connectTimeout = isset( $options['connectTimeout'] )
+                       ? $options['connectTimeout']
+                       : 1;
+               $this->persistent = isset( $options['persistent'] )
+                       ? $options['persistent']
+                       : false;
+               $this->password = isset( $options['password'] )
+                       ? $options['password']
+                       : '';
+               $this->poolSize = isset( $options['poolSize'] )
+                       ? $options['poolSize']
+                       : 5;
+               if ( !isset( $options['serializer'] ) || $options['serializer'] === 'php' ) {
+                       $this->serializer = Redis::SERIALIZER_PHP;
+               } elseif ( $options['serializer'] === 'igbinary' ) {
+                       $this->serializer = Redis::SERIALIZER_IGBINARY;
+               } else {
+                       throw new MWException( "Invalid serializer specified." );
+               }
+       }
+
+       /**
+        * @param $options Array
+        * @return RedisConnectionPool
+        */
+       public static function singleton( array $options ) {
+               $id = sha1( serialize( $options ) );
+               if ( !isset( self::$instances[$id] ) ) {
+                       self::$instances[$id] = new self( $options );
+                       wfDebug( "Creating a new " . __CLASS__ . " instance with id $id." );
+               }
+               return self::$instances[$id];
+       }
+
+       /**
+        * Get a connection to a redis server. Based on code in RedisBagOStuff.php.
+        *
+        * @param $server string A hostname/port combination or the absolute path of a UNIX socket.
+        *                       If a hostname is specified but no port, port 6379 will be used.
+        * @return RedisConnRef|bool Returns false on failure
+        * @throws MWException
+        */
+       public function getConnection( $server ) {
+               // Check the listing "dead" servers which have had a connection errors.
+               // Servers are marked dead for a limited period of time, to
+               // avoid excessive overhead from repeated connection timeouts.
+               if ( isset( $this->downServers[$server] ) ) {
+                       $now = time();
+                       if ( $now > $this->downServers[$server] ) {
+                               // Dead time expired
+                               unset( $this->downServers[$server] );
+                       } else {
+                               // Server is dead
+                               wfDebug( "server $server is marked down for another " .
+                                       ( $this->downServers[$server] - $now ) . " seconds, can't get connection" );
+                               return false;
+                       }
+               }
+
+               // Check if a connection is already free for use
+               if ( isset( $this->connections[$server] ) ) {
+                       foreach ( $this->connections[$server] as &$connection ) {
+                               if ( $connection['free'] ) {
+                                       $connection['free'] = false;
+                                       --$this->idlePoolSize;
+                                       return new RedisConnRef( $this, $server, $connection['conn'] );
+                               }
+                       }
+               }
+
+               if ( substr( $server, 0, 1 ) === '/' ) {
+                       // UNIX domain socket
+                       // These are required by the redis extension to start with a slash, but
+                       // we still need to set the port to a special value to make it work.
+                       $host = $server;
+                       $port = 0;
+               } else {
+                       // TCP connection
+                       $hostPort = IP::splitHostAndPort( $server );
+                       if ( !$hostPort ) {
+                               throw new MWException( __CLASS__.": invalid configured server \"$server\"" );
+                       }
+                       list( $host, $port ) = $hostPort;
+                       if ( $port === false ) {
+                               $port = 6379;
+                       }
+               }
+
+               $conn = new Redis();
+               try {
+                       if ( $this->persistent ) {
+                               $result = $conn->pconnect( $host, $port, $this->connectTimeout );
+                       } else {
+                               $result = $conn->connect( $host, $port, $this->connectTimeout );
+                       }
+                       if ( !$result ) {
+                               wfDebugLog( 'redis', "Could not connect to server $server" );
+                               // Mark server down for some time to avoid further timeouts
+                               $this->downServers[$server] = time() + self::SERVER_DOWN_TTL;
+                               return false;
+                       }
+                       if ( $this->password !== null ) {
+                               if ( !$conn->auth( $this->password ) ) {
+                                       wfDebugLog( 'redis', "Authentication error connecting to $server" );
+                               }
+                       }
+               } catch ( RedisException $e ) {
+                       $this->downServers[$server] = time() + self::SERVER_DOWN_TTL;
+                       wfDebugLog( 'redis', "Redis exception: " . $e->getMessage() . "\n" );
+                       return false;
+               }
+
+               if ( $conn ) {
+                       $conn->setOption( Redis::OPT_SERIALIZER, $this->serializer );
+                       $this->connections[$server][] = array( 'conn' => $conn, 'free' => false );
+                       return new RedisConnRef( $this, $server, $conn );
+               } else {
+                       return false;
+               }
+       }
+
+       /**
+        * Mark a connection to a server as free to return to the pool
+        *
+        * @param $server string
+        * @param $conn Redis
+        * @return boolean
+        */
+       public function freeConnection( $server, Redis $conn ) {
+               $found = false;
+
+               foreach ( $this->connections[$server] as &$connection ) {
+                       if ( $connection['conn'] === $conn && !$connection['free'] ) {
+                               $connection['free'] = true;
+                               ++$this->idlePoolSize;
+                               break;
+                       }
+               }
+
+               $this->closeExcessIdleConections();
+
+               return $found;
+       }
+
+       /**
+        * Close any extra idle connections if there are more than the limit
+        *
+        * @return void
+        */
+       protected function closeExcessIdleConections() {
+               if ( $this->idlePoolSize <= $this->poolSize ) {
+                       return; // nothing to do
+               }
+
+               foreach ( $this->connections as $server => &$serverConnections ) {
+                       foreach ( $serverConnections as $key => &$connection ) {
+                               if ( $connection['free'] ) {
+                                       unset( $serverConnections[$key] );
+                                       if ( --$this->idlePoolSize <= $this->poolSize ) {
+                                               return; // done
+                                       }
+                               }
+                       }
+               }
+       }
+
+       /**
+        * The redis extension throws an exception in response to various read, write
+        * and protocol errors. Sometimes it also closes the connection, sometimes
+        * not. The safest response for us is to explicitly destroy the connection
+        * object and let it be reopened during the next request.
+        *
+        * @param $server string
+        * @param $conn RedisConnRef
+        * @param $e RedisException
+        * @return void
+        */
+       public function handleException( $server, RedisConnRef $conn, RedisException $e ) {
+               wfDebugLog( 'redis',
+                       "Redis exception on server $server: " . $e->getMessage() . "\n" );
+               foreach ( $this->connections[$server] as $key => $connection ) {
+                       if ( $connection['conn'] === $conn ) {
+                               $this->idlePoolSize -= $connection['free'] ? 1 : 0;
+                               unset( $this->connections[$server][$key] );
+                               break;
+                       }
+               }
+       }
+}
+
+/**
+ * Helper class to handle automatically marking connectons as reusable (via RAII pattern)
+ *
+ * @ingroup Redis
+ * @since 1.21
+ */
+class RedisConnRef {
+       /** @var RedisConnectionPool */
+       protected $pool;
+
+       protected $server; // string
+
+       /** @var Redis */
+       protected $conn;
+
+       /**
+        * @param $pool RedisConnectionPool
+        * @param $server string
+        * @param $conn Redis
+        */
+       public function __construct( RedisConnectionPool $pool, $server, Redis $conn ) {
+               $this->pool = $pool;
+               $this->server = $server;
+               $this->conn = $conn;
+       }
+
+       public function __call( $name, $arguments ) {
+               return call_user_func_array( array( $this->conn, $name ), $arguments );
+       }
+
+       function __destruct() {
+               $this->pool->freeConnection( $this->server, $this->conn );
+       }
+}
index 927ca4e..c475694 100644 (file)
@@ -39,6 +39,9 @@ abstract class Job {
                $removeDuplicates,
                $error;
 
+       /** @var Array Additional queue metadata */
+       public $metadata = array();
+
        /*-------------------------------------------------------------------------
         * Abstract functions
         *------------------------------------------------------------------------*/
diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php
new file mode 100644 (file)
index 0000000..6c908a2
--- /dev/null
@@ -0,0 +1,610 @@
+<?php
+/**
+ * Redis-backed job queue code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @author Aaron Schulz
+ */
+
+/**
+ * Class to handle job queues stored in Redis
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueRedis extends JobQueue {
+       /** @var RedisConnectionPool */
+       protected $redisPool;
+
+       protected $server; // string; server address
+
+       const ROOTJOB_TTL   = 1209600; // integer; seconds to remember root jobs (14 days)
+       const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days)
+       const MAX_ATTEMPTS  = 3; // integer; number of times to try a job
+
+       /**
+        * @params include:
+        *   - redisConf : An array of parameters to RedisConnectionPool::__construct().
+        *   - server    : A hostname/port combination or the absolute path of a UNIX socket.
+        *                 If a hostname is specified but no port, the standard port number
+        *                 6379 will be used. Required.
+        * @param array $params
+        */
+       public function __construct( array $params ) {
+               parent::__construct( $params );
+               $this->server = $params['redisConf']['server'];
+               $this->redisPool = RedisConnectionPool::singleton( $params['redisConf'] );
+       }
+
+       /**
+        * @see JobQueue::doIsEmpty()
+        * @return bool
+        * @throws MWException
+        */
+       protected function doIsEmpty() {
+               if ( mt_rand( 0, 99 ) == 0 ) {
+                       $this->doInternalMaintenance();
+               }
+
+               $conn = $this->getConnection();
+               try {
+                       return ( $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ) == 0 );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
+       /**
+        * @see JobQueue::doGetSize()
+        * @return integer
+        * @throws MWException
+        */
+       protected function doGetSize() {
+               if ( mt_rand( 0, 99 ) == 0 ) {
+                       $this->doInternalMaintenance();
+               }
+
+               $conn = $this->getConnection();
+               try {
+                       return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
+       /**
+        * @see JobQueue::doGetAcquiredCount()
+        * @return integer
+        * @throws MWException
+        */
+       protected function doGetAcquiredCount() {
+               if ( mt_rand( 0, 99 ) == 0 ) {
+                       $this->doInternalMaintenance();
+               }
+
+               $conn = $this->getConnection();
+               try {
+                       if ( $this->claimTTL > 0 ) {
+                               return $conn->lSize( $this->getQueueKey( 'l-claimed' ) );
+                       } else {
+                               return 0;
+                       }
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
+       /**
+        * @see JobQueue::doBatchPush()
+        * @param array $jobs
+        * @param $flags
+        * @return bool
+        * @throws MWException
+        */
+       protected function doBatchPush( array $jobs, $flags ) {
+               if ( !count( $jobs ) ) {
+                       return true;
+               }
+
+               // Convert the jobs into a list of field maps
+               $items = array(); // (uid => job fields map)
+               foreach ( $jobs as $job ) {
+                       $item = $this->getNewJobFields( $job );
+                       $items[$item['uid']] = $item;
+               }
+
+               $dedupUids = array(); // list of uids to check for duplicates
+               foreach ( $items as $item ) {
+                       if ( $this->isHashUid( $item['uid'] ) ) { // hash identifier => de-duplicate
+                               $dedupUids[] = $item['uid'];
+                       }
+               }
+
+               $conn = $this->getConnection();
+               try {
+                       // Find which of these jobs are duplicates unclaimed jobs in the queue...
+                       if ( count( $dedupUids ) ) {
+                               $conn->multi( Redis::PIPELINE );
+                               foreach ( $dedupUids as $uid ) { // check if job data exists
+                                       $conn->exists( $this->prefixWithQueueKey( 'data', $uid ) );
+                               }
+                               if ( $this->claimTTL > 0 ) { // check which jobs were claimed
+                                       foreach ( $dedupUids as $uid ) {
+                                               $conn->hExists( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime' );
+                                       }
+                                       list( $exists, $claimed ) = array_chunk( $conn->exec(), count( $dedupUids ) );
+                               } else {
+                                       $exists = $conn->exec();
+                                       $claimed = array(); // no claim system
+                               }
+                               // Remove the duplicate jobs to cut down on pushing duplicate uids...
+                               foreach ( $dedupUids as $k => $uid ) {
+                                       if ( $exists[$k] && empty( $claimed[$k] ) ) {
+                                               unset( $items[$uid] );
+                                       }
+                               }
+                       }
+                       // Actually push the non-duplicate jobs into the queue...
+                       if ( count( $items ) ) {
+                               $uids = array_keys( $items );
+                               $conn->multi( Redis::MULTI ); // begin (atomic trx)
+                               $conn->mSet( $this->prefixKeysWithQueueKey( 'data', $items ) );
+                               call_user_func_array(
+                                       array( $conn, 'lPush' ),
+                                       array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uids )
+                               );
+                               $res = $conn->exec(); // commit (atomic trx)
+                               if ( in_array( false, $res, true ) ) {
+                                       wfDebugLog( 'JobQueueRedis', "Could not insert {$this->type} job(s)." );
+                                       return false;
+                               }
+                       }
+                       wfIncrStats( 'job-insert', count( $items ) );
+                       wfIncrStats( 'job-insert-duplicate', count( $jobs ) - count( $items ) );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+
+               return true;
+       }
+
+       /**
+        * @see JobQueue::doPop()
+        * @return Job|bool
+        * @throws MWException
+        */
+       protected function doPop() {
+               $job = false;
+
+               if ( mt_rand( 0, 99 ) == 0 ) {
+                       $this->doInternalMaintenance();
+               }
+
+               $conn = $this->getConnection();
+               try {
+                       do {
+                               // Atomically pop an item off the queue and onto the "claimed" list
+                               $uid = $conn->rpoplpush(
+                                       $this->getQueueKey( 'l-unclaimed' ),
+                                       $this->getQueueKey( 'l-claimed' )
+                               );
+                               if ( $uid === false ) {
+                                       break; // no jobs; nothing to do
+                               }
+
+                               wfIncrStats( 'job-pop' );
+                               $conn->multi( Redis::PIPELINE );
+                               $conn->get( $this->prefixWithQueueKey( 'data', $uid ) );
+                               if ( $this->claimTTL > 0 ) {
+                                       // Set the claim timestamp metadata. If this step fails, then
+                                       // the timestamp will be assumed to be the current timestamp by
+                                       // recycleAndDeleteStaleJobs() as of the next time that it runs.
+                                       // If two runners claim duplicate jobs, one will abort here.
+                                       $conn->hSetNx( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', time() );
+                               } else {
+                                       // If this fails, the message key will be deleted in cleanupClaimedJobs().
+                                       // If two runners claim duplicate jobs, one of them will abort here.
+                                       $conn->delete(
+                                               $this->prefixWithQueueKey( 'h-meta', $uid ),
+                                               $this->prefixWithQueueKey( 'data', $uid ) );
+                               }
+                               list( $item, $ok ) = $conn->exec();
+                               if ( $item === false || ( $this->claimTTL && !$ok ) ) {
+                                       wfDebug( "Could not find or delete job $uid; probably was a duplicate." );
+                                       continue; // job was probably a duplicate
+                               }
+
+                               // If $item is invalid, recycleAndDeleteStaleJobs() will cleanup as needed
+                               $job = $this->getJobFromFields( $item ); // may be false
+                       } while ( !$job ); // job may be false if invalid
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+
+               // Flag this job as an old duplicate based on its "root" job...
+               try {
+                       if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
+                               wfIncrStats( 'job-pop-duplicate' );
+                               return DuplicateJob::newFromJob( $job ); // convert to a no-op
+                       }
+               } catch ( MWException $e ) {} // don't lose jobs over this
+
+               return $job;
+       }
+
+       /**
+        * @see JobQueue::doAck()
+        * @param Job $job
+        * @return Job|bool
+        * @throws MWException
+        */
+       protected function doAck( Job $job ) {
+               if ( $this->claimTTL > 0 ) {
+                       $conn = $this->getConnection();
+                       try {
+                               // Get the exact field map this Job came from, regardless of whether
+                               // the job was transformed into a DuplicateJob or anything of the sort.
+                               $item = $job->metadata['sourceFields'];
+
+                               $conn->multi( Redis::MULTI ); // begin (atomic trx)
+                               // Remove the first instance of this job scanning right-to-left.
+                               // This is O(N) in the worst case, but is likely to be much faster since
+                               // jobs are pushed to the left and we are starting from the right, where
+                               // the longest running jobs are likely to be. These should be the first
+                               // jobs to be acknowledged assuming that job run times are roughly equal.
+                               $conn->lRem( $this->getQueueKey( 'l-claimed' ), $item['uid'], -1 );
+                               // Delete the job data and its claim metadata
+                               $conn->delete(
+                                       $this->prefixWithQueueKey( 'h-meta', $item['uid'] ),
+                                       $this->prefixWithQueueKey( 'data', $item['uid'] ) );
+                               $res = $conn->exec(); // commit (atomic trx)
+
+                               if ( in_array( false, $res, true ) ) {
+                                       wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." );
+                                       return false;
+                               }
+                       } catch ( RedisException $e ) {
+                               $this->throwRedisException( $this->server, $conn, $e );
+                       }
+               }
+               return true;
+       }
+
+       /**
+        * @see JobQueue::doDeduplicateRootJob()
+        * @param Job $job
+        * @return bool
+        * @throws MWException
+        */
+       protected function doDeduplicateRootJob( Job $job ) {
+               $params = $job->getParams();
+               if ( !isset( $params['rootJobSignature'] ) ) {
+                       throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
+               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+                       throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
+               }
+               $key = $this->getRootJobKey( $params['rootJobSignature'] );
+
+               $conn = $this->getConnection();
+               try {
+                       $timestamp = $conn->get( $key ); // current last timestamp of this job
+                       if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+                               return true; // a newer version of this root job was enqueued
+                       }
+                       // Update the timestamp of the last root job started at the location...
+                       return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+       }
+
+       /**
+        * Check if the "root" job of a given job has been superseded by a newer one
+        *
+        * @param $job Job
+        * @return bool
+        * @throws MWException
+        */
+       protected function isRootJobOldDuplicate( Job $job ) {
+               $params = $job->getParams();
+               if ( !isset( $params['rootJobSignature'] ) ) {
+                       return false; // job has no de-deplication info
+               } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+                       wfDebugLog( 'JobQueueRedis', "Cannot check root job; missing 'rootJobTimestamp'." );
+                       return false;
+               }
+
+               $conn = $this->getConnection();
+               try {
+                       // Get the last time this root job was enqueued
+                       $timestamp = $conn->get( $this->getRootJobKey( $params['rootJobSignature'] ) );
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+
+               // Check if a new root job was started at the location after this one's...
+               return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+       }
+
+       /**
+        * Do any job recycling or queue cleanup as needed
+        *
+        * @return void
+        * @return integer Number of jobs recycled/deleted
+        * @throws MWException
+        */
+       protected function doInternalMaintenance() {
+               return ( $this->claimTTL > 0 ) ?
+                       $this->recycleAndDeleteStaleJobs() : $this->cleanupClaimedJobs();
+       }
+
+       /**
+        * Recycle or destroy any jobs that have been claimed for too long
+        *
+        * @return integer Number of jobs recycled/deleted
+        * @throws MWException
+        */
+       protected function recycleAndDeleteStaleJobs() {
+               $count = 0;
+               // For each job item that can be retried, we need to add it back to the
+               // main queue and remove it from the list of currenty claimed job items.
+               $conn = $this->getConnection();
+               try {
+                       // Avoid duplicate insertions of items to be re-enqueued
+                       $conn->multi( Redis::MULTI );
+                       $conn->setnx( $this->getQueueKey( 'lock' ), 1 );
+                       $conn->expire( $this->getQueueKey( 'lock' ), 3600 );
+                       if ( $conn->exec() !== array( true, true ) ) { // lock
+                               return $count; // already in progress
+                       }
+
+                       $now = time();
+                       $claimCutoff = $now - $this->claimTTL;
+                       $pruneCutoff = $now - self::MAX_AGE_PRUNE;
+
+                       // Get the list of all claimed jobs
+                       $claimedUids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 );
+                       // Get a map of (uid => claim metadata) for all claimed jobs
+                       $metadata = $conn->mGet( $this->prefixValuesWithQueueKey( 'h-meta', $claimedUids ) );
+
+                       $uidsPush = array(); // items IDs to move to the "unclaimed" queue
+                       $uidsRemove = array(); // item IDs to remove from "claimed" queue
+                       foreach ( $claimedUids as $i => $uid ) { // all claimed items
+                               $info = $metadata[$i] ? $metadata[$i] : array();
+                               if ( isset( $info['ctime'] ) || isset( $info['rctime'] ) ) {
+                                       // Prefer "ctime" (set by pop()) over "rctime" (set by this function)
+                                       $ctime = isset( $info['ctime'] ) ? $info['ctime'] : $info['rctime'];
+                                       // Claimed job claimed for too long?
+                                       if ( $ctime < $claimCutoff ) {
+                                               // Get the number of failed attempts
+                                               $attempts = isset( $info['attempts'] ) ? $info['attempts'] : 0;
+                                               if ( $attempts < self::MAX_ATTEMPTS ) {
+                                                       $uidsPush[] = $uid; // retry it
+                                               } elseif ( $ctime < $pruneCutoff ) {
+                                                       $uidsRemove[] = $uid; // just remove it
+                                               }
+                                       }
+                               } else {
+                                       // If pop() failed to set the claim timestamp, set it to the current time.
+                                       // Since that function sets this non-atomically *after* moving the job to
+                                       // the "claimed" queue, it may be the case that it just didn't set it yet.
+                                       $conn->hSet( $this->prefixWithQueueKey( 'h-meta', $uid ), 'rctime', $now );
+                               }
+                       }
+
+                       $conn->multi( Redis::MULTI ); // begin (atomic trx)
+                       if ( count( $uidsPush ) ) { // move from "l-claimed" to "l-unclaimed"
+                               call_user_func_array(
+                                       array( $conn, 'lPush' ),
+                                       array_merge( array( $this->getQueueKey( 'l-unclaimed' ) ), $uidsPush )
+                               );
+                               foreach ( $uidsPush as $uid ) {
+                                       $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 );
+                                       $conn->hDel( $this->prefixWithQueueKey( 'h-meta', $uid ), 'ctime', 'rctime' );
+                                       $conn->hIncrBy( $this->prefixWithQueueKey( 'h-meta', $uid ), 'attempts', 1 );
+                               }
+                       }
+                       foreach ( $uidsRemove as $uid ) { // remove from "l-claimed"
+                               $conn->lRem( $this->getQueueKey( 'l-claimed' ), $uid, -1 );
+                               $conn->delete( // delete job data and metadata
+                                       $this->prefixWithQueueKey( 'h-meta', $uid ),
+                                       $this->prefixWithQueueKey( 'data', $uid ) );
+                       }
+                       $res = $conn->exec(); // commit (atomic trx)
+
+                       if ( in_array( false, $res, true ) ) {
+                               wfDebugLog( 'JobQueueRedis', "Could not recycle {$this->type} job(s)." );
+                       } else {
+                               $count += ( count( $uidsPush ) + count( $uidsRemove ) );
+                               wfIncrStats( 'job-recycle', count( $uidsPush ) );
+                       }
+
+                       $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+
+               return $count;
+       }
+
+       /**
+        * Destroy any jobs that have been claimed
+        *
+        * @return integer Number of jobs deleted
+        * @throws MWException
+        */
+       protected function cleanupClaimedJobs() {
+               $count = 0;
+               // Make sure the message for claimed jobs was deleted
+               // and remove the claimed job IDs from the "claimed" list.
+               $conn = $this->getConnection();
+               try {
+                       // Avoid races and duplicate effort
+                       $conn->multi( Redis::MULTI );
+                       $conn->setnx( $this->getQueueKey( 'lock' ), 1 );
+                       $conn->expire( $this->getQueueKey( 'lock' ), 3600 );
+                       if ( $conn->exec() !== array( true, true ) ) { // lock
+                               return $count; // already in progress
+                       }
+                       // Get the list of all claimed jobs
+                       $uids = $conn->lRange( $this->getQueueKey( 'l-claimed' ), 0, -1 );
+                       if ( count( $uids ) ) {
+                               // Delete the message keys and delist the corresponding ids.
+                               // Since the only other changes to "l-claimed" are left pushes, we can just strip
+                               // off the elements read here using a right trim based on the number of ids read.
+                               $conn->multi( Redis::MULTI ); // begin (atomic trx)
+                               $conn->lTrim( $this->getQueueKey( 'l-claimed' ), 0, -count( $uids ) - 1 );
+                               $conn->delete( array_merge(
+                                       $this->prefixValuesWithQueueKey( 'h-meta', $uids ),
+                                       $this->prefixValuesWithQueueKey( 'data', $uids )
+                               ) );
+                               $res = $conn->exec(); // commit (atomic trx)
+
+                               if ( in_array( false, $res, true ) ) {
+                                       wfDebugLog( 'JobQueueRedis', "Could not purge {$this->type} job(s)." );
+                               } else {
+                                       $count += count( $uids );
+                               }
+                       }
+                       $conn->delete( $this->getQueueKey( 'lock' ) ); // unlock
+               } catch ( RedisException $e ) {
+                       $this->throwRedisException( $this->server, $conn, $e );
+               }
+
+               return $count;
+       }
+
+       /**
+        * @param $job Job
+        * @return array
+        */
+       protected function getNewJobFields( Job $job ) {
+               return array(
+                       // Fields that describe the nature of the job
+                       'type'      => $job->getType(),
+                       'namespace' => $job->getTitle()->getNamespace(),
+                       'title'     => $job->getTitle()->getDBkey(),
+                       'params'    => $job->getParams(),
+                       // Additional metadata
+                       'uid'       => $job->ignoreDuplicates()
+                               ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
+                               : wfRandomString( 32 ),
+                       'timestamp' => time() // UNIX timestamp
+               );
+       }
+
+       /**
+        * @param $fields array
+        * @return Job|bool
+        */
+       protected function getJobFromFields( array $fields ) {
+               $title = Title::makeTitleSafe( $fields['namespace'], $fields['title'] );
+               if ( $title ) {
+                       $job = Job::factory( $fields['type'], $title, $fields['params'] );
+                       $job->metadata['sourceFields'] = $fields;
+                       return $job;
+               }
+               return false;
+       }
+
+       /**
+        * @param $uid string Job UID
+        * @return bool Whether $uid is a SHA-1 hash based identifier for de-duplication
+        */
+       protected function isHashUid( $uid ) {
+               return strlen( $uid ) == 31;
+       }
+
+       /**
+        * Get a connection to the server that handles all sub-queues for this queue
+        *
+        * @return Array (server name, Redis instance)
+        * @throws MWException
+        */
+       protected function getConnection() {
+               $conn = $this->redisPool->getConnection( $this->server );
+               if ( !$conn ) {
+                       throw new MWException( "Unable to connect to redis server." );
+               }
+               return $conn;
+       }
+
+       /**
+        * @param $server string
+        * @param $conn RedisConnRef
+        * @param $e RedisException
+        * @throws MWException
+        */
+       protected function throwRedisException( $server, RedisConnRef $conn, $e ) {
+               $this->redisPool->handleException( $server, $conn, $e );
+               throw new MWException( "Redis server error: {$e->getMessage()}\n" );
+       }
+
+       /**
+        * @param $prop string
+        * @return string
+        */
+       private function getQueueKey( $prop ) {
+               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $prop );
+       }
+
+       /**
+        * @param string $signature Hash identifier of the root job
+        * @return string
+        */
+       private function getRootJobKey( $signature ) {
+               list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+               return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
+       }
+
+       /**
+        * @param $prop string
+        * @param $string string
+        * @return string
+        */
+       private function prefixWithQueueKey( $prop, $string ) {
+               return $this->getQueueKey( $prop ) . ':' . $string;
+       }
+
+       /**
+        * @param $prop string
+        * @param $items array
+        * @return Array
+        */
+       private function prefixValuesWithQueueKey( $prop, array $items ) {
+               $res = array();
+               foreach ( $items as $item ) {
+                       $res[] = $this->prefixWithQueueKey( $prop, $item );
+               }
+               return $res;
+       }
+
+       /**
+        * @param $prop string
+        * @param $items array
+        * @return Array
+        */
+       private function prefixKeysWithQueueKey( $prop, array $items ) {
+               $res = array();
+               foreach ( $items as $key => $item ) {
+                       $res[$this->prefixWithQueueKey( $prop, $key )] = $item;
+               }
+               return $res;
+       }
+}
index 23418e3..1b64b10 100644 (file)
@@ -46,9 +46,10 @@ final class DuplicateJob extends Job {
         */
        public static function newFromJob( Job $job ) {
                $djob = new self( $job->getTitle(), $job->getParams(), $job->getId() );
-               $djob->command = $job->getType();
-               $djob->params  = is_array( $djob->params ) ? $djob->params : array();
-               $djob->params  = array( 'isDuplicate' => true ) + $djob->params;
+               $djob->command  = $job->getType();
+               $djob->params   = is_array( $djob->params ) ? $djob->params : array();
+               $djob->params   = array( 'isDuplicate' => true ) + $djob->params;
+               $djob->metadata = $job->metadata;
                return $djob;
        }