X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjob%2FJobQueueRedis.php;h=9b9fe2d4e277f4648ab1a23568616c419de411a1;hb=8c0cbe6f830c4c7f181b2e070f07854ab0dc8d07;hp=67bb5a41ed55566f1d583b2e352defb6d96e4bed;hpb=164f334e53bb3ea854bf1f2a3c1202102ccac5df;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/job/JobQueueRedis.php b/includes/job/JobQueueRedis.php index 67bb5a41ed..9b9fe2d4e2 100644 --- a/includes/job/JobQueueRedis.php +++ b/includes/job/JobQueueRedis.php @@ -60,12 +60,16 @@ class JobQueueRedis extends JobQueue { /** @var RedisConnectionPool */ protected $redisPool; - protected $server; // string; server address - protected $compression; // string; compression method to use + /** @var string Server address */ + protected $server; + + /** @var string Compression method to use */ + protected $compression; const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed (7 days) - protected $key; // string; key to prefix the queue keys with (used for testing) + /** @var string Key to prefix the queue keys with (used for testing) */ + protected $key; /** * @params include: @@ -108,7 +112,7 @@ class JobQueueRedis extends JobQueue { /** * @see JobQueue::doGetSize() - * @return integer + * @return int * @throws MWException */ protected function doGetSize() { @@ -122,8 +126,8 @@ class JobQueueRedis extends JobQueue { /** * @see JobQueue::doGetAcquiredCount() - * @return integer - * @throws MWException + * @return int + * @throws JobQueueError */ protected function doGetAcquiredCount() { if ( $this->claimTTL <= 0 ) { @@ -134,6 +138,7 @@ class JobQueueRedis extends JobQueue { $conn->multi( Redis::PIPELINE ); $conn->zSize( $this->getQueueKey( 'z-claimed' ) ); $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); + return array_sum( $conn->exec() ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); @@ -142,8 +147,8 @@ class JobQueueRedis extends JobQueue { /** * @see JobQueue::doGetDelayedCount() - * @return integer - * @throws MWException + * @return int + * @throws JobQueueError */ protected function doGetDelayedCount() { if ( !$this->checkDelay ) { @@ -159,8 +164,8 @@ class JobQueueRedis extends JobQueue { /** * @see JobQueue::doGetAbandonedCount() - * @return integer - * @throws MWException + * @return int + * @throws JobQueueError */ protected function doGetAbandonedCount() { if ( $this->claimTTL <= 0 ) { @@ -179,7 +184,7 @@ class JobQueueRedis extends JobQueue { * @param array $jobs * @param $flags * @return bool - * @throws MWException + * @throws JobQueueError */ protected function doBatchPush( array $jobs, $flags ) { // Convert the jobs into field maps (de-duplicated against each other) @@ -217,6 +222,7 @@ class JobQueueRedis extends JobQueue { } if ( $failed > 0 ) { wfDebugLog( 'JobQueueRedis', "Could not insert {$failed} {$this->type} job(s)." ); + return false; } JobQueue::incrStats( 'job-insert', $this->type, count( $items ) ); @@ -232,7 +238,7 @@ class JobQueueRedis extends JobQueue { /** * @param RedisConnRef $conn * @param array $items List of results from JobQueueRedis::getNewJobFields() - * @return integer Number of jobs inserted (duplicates are ignored) + * @return int Number of jobs inserted (duplicates are ignored) * @throws RedisException */ protected function pushBlobs( RedisConnRef $conn, array $items ) { @@ -245,23 +251,24 @@ class JobQueueRedis extends JobQueue { } static $script = << 0 then -- Insert into delayed queue (release time as score) - redis.call('zAdd',KEYS[4],rtimestamp,id) + redis.call('zAdd',kDelayed,rtimestamp,id) else -- Insert into unclaimed queue - redis.call('lPush',KEYS[1],id) + redis.call('lPush',kUnclaimed,id) end if sha1 ~= '' then - redis.call('hSet',KEYS[2],id,sha1) - redis.call('hSet',KEYS[3],sha1,id) + redis.call('hSet',kSha1ById,id,sha1) + redis.call('hSet',kIdBySha1,sha1,id) end - redis.call('hSet',KEYS[5],id,blob) + redis.call('hSet',kData,id,blob) pushed = pushed + 1 end end @@ -285,7 +292,7 @@ LUA; /** * @see JobQueue::doPop() * @return Job|bool - * @throws MWException + * @throws JobQueueError */ protected function doPop() { $job = false; @@ -337,16 +344,17 @@ LUA; protected function popAndDeleteBlob( RedisConnRef $conn ) { static $script = <<luaEval( $script, array( @@ -399,7 +408,7 @@ LUA; * @see JobQueue::doAck() * @param Job $job * @return Job|bool - * @throws MWException + * @throws MWException|JobQueueError */ protected function doAck( Job $job ) { if ( !isset( $job->metadata['uuid'] ) ) { @@ -410,11 +419,12 @@ LUA; try { static $script = <<luaEval( $script, array( @@ -428,12 +438,14 @@ LUA; if ( !$res ) { wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job." ); + return false; } } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); } } + return true; } @@ -441,7 +453,7 @@ LUA; * @see JobQueue::doDeduplicateRootJob() * @param Job $job * @return bool - * @throws MWException + * @throws MWException|JobQueueError */ protected function doDeduplicateRootJob( Job $job ) { if ( !$job->hasRootJobParams() ) { @@ -457,6 +469,7 @@ LUA; if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { return true; // a newer version of this root job was enqueued } + // Update the timestamp of the last root job started at the location... return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks } catch ( RedisException $e ) { @@ -468,6 +481,7 @@ LUA; * @see JobQueue::doIsRootJobOldDuplicate() * @param Job $job * @return bool + * @throws JobQueueError */ protected function doIsRootJobOldDuplicate( Job $job ) { if ( !$job->hasRootJobParams() ) { @@ -490,6 +504,7 @@ LUA; /** * @see JobQueue::doDelete() * @return bool + * @throws JobQueueError */ protected function doDelete() { static $props = array( 'l-unclaimed', 'z-claimed', 'z-abandoned', @@ -501,6 +516,7 @@ LUA; foreach ( $props as $prop ) { $keys[] = $this->getQueueKey( $prop ); } + return ( $conn->delete( $keys ) !== false ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); @@ -515,12 +531,15 @@ LUA; $conn = $this->getConnection(); try { $that = $this; + return new MappedIterator( $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ), - function( $uid ) use ( $that, $conn ) { + function ( $uid ) use ( $that, $conn ) { return $that->getJobFromUidInternal( $uid, $conn ); }, - array( 'accept' => function ( $job ) { return is_object( $job ); } ) + array( 'accept' => function ( $job ) { + return is_object( $job ); + } ) ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); @@ -535,12 +554,15 @@ LUA; $conn = $this->getConnection(); try { $that = $this; + return new MappedIterator( // delayed jobs $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ), - function( $uid ) use ( $that, $conn ) { + function ( $uid ) use ( $that, $conn ) { return $that->getJobFromUidInternal( $uid, $conn ); }, - array( 'accept' => function ( $job ) { return is_object( $job ); } ) + array( 'accept' => function ( $job ) { + return is_object( $job ); + } ) ); } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); @@ -573,6 +595,7 @@ LUA; } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); } + return $sizes; } @@ -582,7 +605,7 @@ LUA; * @param $uid string * @param $conn RedisConnRef * @return Job|bool Returns false if the job does not exist - * @throws MWException + * @throws MWException|JobQueueError */ public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { try { @@ -597,6 +620,7 @@ LUA; $title = Title::makeTitle( $item['namespace'], $item['title'] ); $job = Job::factory( $item['type'], $title, $item['params'] ); $job->metadata['uuid'] = $item['uuid']; + return $job; } catch ( RedisException $e ) { $this->throwRedisException( $this->server, $conn, $e ); @@ -606,8 +630,8 @@ LUA; /** * Release any ready delayed jobs into the queue * - * @return integer Number of jobs released - * @throws MWException + * @return int Number of jobs released + * @throws JobQueueError */ public function releaseReadyDelayedJobs() { $count = 0; @@ -616,12 +640,13 @@ LUA; try { static $script = <<claimTTL <= 0 ) { // sanity @@ -660,33 +685,34 @@ LUA; $now = time(); static $script = << 300 // 5 minutes ); } + return $tasks; } /** - * @param $job Job + * @param Job $job * @return array */ protected function getNewJobFields( Job $job ) { return array( // Fields that describe the nature of the job - 'type' => $job->getType(), - 'namespace' => $job->getTitle()->getNamespace(), - 'title' => $job->getTitle()->getDBkey(), - 'params' => $job->getParams(), + 'type' => $job->getType(), + 'namespace' => $job->getTitle()->getNamespace(), + 'title' => $job->getTitle()->getDBkey(), + 'params' => $job->getParams(), // Some jobs cannot run until a "release timestamp" 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, // Additional job metadata - 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), - 'sha1' => $job->ignoreDuplicates() - ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) - : '', - 'timestamp' => time() // UNIX timestamp + 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), + 'sha1' => $job->ignoreDuplicates() + ? wfBaseConvert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) + : '', + 'timestamp' => time() // UNIX timestamp ); } @@ -768,8 +795,10 @@ LUA; if ( $title ) { $job = Job::factory( $fields['type'], $title, $fields['params'] ); $job->metadata['uuid'] = $fields['uuid']; + return $job; } + return false; } @@ -780,10 +809,12 @@ LUA; protected function serialize( array $fields ) { $blob = serialize( $fields ); if ( $this->compression === 'gzip' - && strlen( $blob ) >= 1024 && function_exists( 'gzdeflate' ) ) - { + && strlen( $blob ) >= 1024 + && function_exists( 'gzdeflate' ) + ) { $object = (object)array( 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ); $blobz = serialize( $object ); + return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; } else { return $blob; @@ -803,20 +834,22 @@ LUA; $fields = false; } } + return is_array( $fields ) ? $fields : false; } /** * Get a connection to the server that handles all sub-queues for this queue * - * @return Array (server name, Redis instance) - * @throws MWException + * @return RedisConnRef + * @throws JobQueueConnectionError */ protected function getConnection() { $conn = $this->redisPool->getConnection( $this->server ); if ( !$conn ) { throw new JobQueueConnectionError( "Unable to connect to redis server." ); } + return $conn; } @@ -824,7 +857,7 @@ LUA; * @param $server string * @param $conn RedisConnRef * @param $e RedisException - * @throws MWException + * @throws JobQueueError */ protected function throwRedisException( $server, RedisConnRef $conn, $e ) { $this->redisPool->handleException( $server, $conn, $e );