X-Git-Url: http://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fjobqueue%2FJobQueueRedis.php;h=5e7a11571a730e93f3e58045b1c76d9523ba6d55;hb=1758a57245f9d7752b821aaef3dc5e1ba65da7be;hp=e9505bcfb4e55557bc26765053763465c9d11e49;hpb=1b4f3579017a46344a9f1857db241bc3909bb7f5;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/jobqueue/JobQueueRedis.php b/includes/jobqueue/JobQueueRedis.php index e9505bcfb4..b8a5ad2107 100644 --- a/includes/jobqueue/JobQueueRedis.php +++ b/includes/jobqueue/JobQueueRedis.php @@ -19,6 +19,8 @@ * * @file */ + +use MediaWiki\Logger\LoggerFactory; use Psr\Log\LoggerInterface; /** @@ -100,7 +102,7 @@ class JobQueueRedis extends JobQueue { "Non-daemonized mode is no longer supported. Please install the " . "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); } - $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' ); + $this->logger = LoggerFactory::getInstance( 'redis' ); } protected function supportedOrders() { @@ -134,7 +136,7 @@ class JobQueueRedis extends JobQueue { try { return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } } @@ -152,7 +154,7 @@ class JobQueueRedis extends JobQueue { return array_sum( $conn->exec() ); } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } } @@ -166,7 +168,7 @@ class JobQueueRedis extends JobQueue { try { return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } } @@ -180,7 +182,7 @@ class JobQueueRedis extends JobQueue { try { return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } } @@ -203,7 +205,7 @@ class JobQueueRedis extends JobQueue { } } - if ( !count( $items ) ) { + if ( $items === [] ) { return; // nothing to do } @@ -225,9 +227,9 @@ class JobQueueRedis extends JobQueue { $failed += count( $itemBatch ); } } - JobQueue::incrStats( 'inserts', $this->type, count( $items ) ); - JobQueue::incrStats( 'inserts_actual', $this->type, $pushed ); - JobQueue::incrStats( 'dupe_inserts', $this->type, + $this->incrStats( 'inserts', $this->type, count( $items ) ); + $this->incrStats( 'inserts_actual', $this->type, $pushed ); + $this->incrStats( 'dupe_inserts', $this->type, count( $items ) - $failed - $pushed ); if ( $failed > 0 ) { $err = "Could not insert {$failed} {$this->type} job(s)."; @@ -235,7 +237,7 @@ class JobQueueRedis extends JobQueue { throw new RedisException( $err ); } } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } } @@ -307,7 +309,7 @@ LUA; /** * @see JobQueue::doPop() - * @return Job|bool + * @return RunnableJob|bool * @throws JobQueueError */ protected function doPop() { @@ -321,7 +323,7 @@ LUA; break; // no jobs; nothing to do } - JobQueue::incrStats( 'pops', $this->type ); + $this->incrStats( 'pops', $this->type ); $item = $this->unserialize( $blob ); if ( $item === false ) { wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); @@ -332,7 +334,7 @@ LUA; $job = $this->getJobFromFields( $item ); // may be false } while ( !$job ); // job may be false if invalid } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } return $job; @@ -379,17 +381,17 @@ LUA; /** * @see JobQueue::doAck() - * @param Job $job - * @return Job|bool + * @param RunnableJob $job + * @return RunnableJob|bool * @throws UnexpectedValueException * @throws JobQueueError */ - protected function doAck( Job $job ) { - if ( !isset( $job->metadata['uuid'] ) ) { + protected function doAck( RunnableJob $job ) { + $uuid = $job->getMetadata( 'uuid' ); + if ( $uuid === null ) { throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." ); } - $uuid = $job->metadata['uuid']; $conn = $this->getConnection(); try { static $script = @@ -424,9 +426,9 @@ LUA; return false; } - JobQueue::incrStats( 'acks', $this->type ); + $this->incrStats( 'acks', $this->type ); } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } return true; @@ -457,17 +459,17 @@ LUA; // Update the timestamp of the last root job started at the location... return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } } /** * @see JobQueue::doIsRootJobOldDuplicate() - * @param Job $job + * @param IJobSpecification $job * @return bool * @throws JobQueueError */ - protected function doIsRootJobOldDuplicate( Job $job ) { + protected function doIsRootJobOldDuplicate( IJobSpecification $job ) { if ( !$job->hasRootJobParams() ) { return false; // job has no de-deplication info } @@ -478,8 +480,7 @@ LUA; // Get the last time this root job was enqueued $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); } catch ( RedisException $e ) { - $timestamp = false; - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } // Check if a new root job was started at the location after this one's... @@ -507,7 +508,7 @@ LUA; return $ok; } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } } @@ -521,7 +522,7 @@ LUA; try { $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ); } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } return $this->getJobIterator( $conn, $uids ); @@ -537,7 +538,7 @@ LUA; try { $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ); } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } return $this->getJobIterator( $conn, $uids ); @@ -553,7 +554,7 @@ LUA; try { $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 ); } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } return $this->getJobIterator( $conn, $uids ); @@ -569,7 +570,7 @@ LUA; try { $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ); } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } return $this->getJobIterator( $conn, $uids ); @@ -616,7 +617,7 @@ LUA; } } } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } return $sizes; @@ -626,12 +627,12 @@ LUA; * This function should not be called outside JobQueueRedis * * @param string $uid - * @param RedisConnRef $conn - * @return Job|bool Returns false if the job does not exist + * @param RedisConnRef|Redis $conn + * @return RunnableJob|bool Returns false if the job does not exist * @throws JobQueueError * @throws UnexpectedValueException */ - public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { + public function getJobFromUidInternal( $uid, $conn ) { try { $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); if ( $data === false ) { @@ -639,18 +640,21 @@ LUA; } $item = $this->unserialize( $data ); if ( !is_array( $item ) ) { // this shouldn't happen - throw new UnexpectedValueException( "Could not find job with ID '$uid'." ); + throw new UnexpectedValueException( "Could not unserialize job with ID '$uid'." ); } - $title = Title::makeTitle( $item['namespace'], $item['title'] ); - $job = Job::factory( $item['type'], $title, $item['params'] ); - $job->metadata['uuid'] = $item['uuid']; - $job->metadata['timestamp'] = $item['timestamp']; + + $params = $item['params']; + $params += [ 'namespace' => $item['namespace'], 'title' => $item['title'] ]; + $job = $this->factoryJob( $item['type'], $params ); + $job->setMetadata( 'uuid', $item['uuid'] ); + $job->setMetadata( 'timestamp', $item['timestamp'] ); // Add in attempt count for debugging at showJobs.php - $job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ); + $job->setMetadata( 'attempts', + $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ) ); return $job; } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } } @@ -669,7 +673,7 @@ LUA; $queues[] = $this->decodeQueueName( $queue ); } } catch ( RedisException $e ) { - $this->throwRedisException( $conn, $e ); + throw $this->handleErrorAndMakeException( $conn, $e ); } return $queues; @@ -683,8 +687,8 @@ LUA; return [ // Fields that describe the nature of the job 'type' => $job->getType(), - 'namespace' => $job->getTitle()->getNamespace(), - 'title' => $job->getTitle()->getDBkey(), + 'namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL, + 'title' => $job->getParams()['title'] ?? '', 'params' => $job->getParams(), // Some jobs cannot run until a "release timestamp" 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, @@ -699,13 +703,15 @@ LUA; /** * @param array $fields - * @return Job|bool + * @return RunnableJob|bool */ protected function getJobFromFields( array $fields ) { - $title = Title::makeTitle( $fields['namespace'], $fields['title'] ); - $job = Job::factory( $fields['type'], $title, $fields['params'] ); - $job->metadata['uuid'] = $fields['uuid']; - $job->metadata['timestamp'] = $fields['timestamp']; + $params = $fields['params']; + $params += [ 'namespace' => $fields['namespace'], 'title' => $fields['title'] ]; + + $job = $this->factoryJob( $fields['type'], $params ); + $job->setMetadata( 'uuid', $fields['uuid'] ); + $job->setMetadata( 'timestamp', $fields['timestamp'] ); return $job; } @@ -749,7 +755,7 @@ LUA; /** * Get a connection to the server that handles all sub-queues for this queue * - * @return RedisConnRef + * @return RedisConnRef|Redis * @throws JobQueueConnectionError */ protected function getConnection() { @@ -765,18 +771,18 @@ LUA; /** * @param RedisConnRef $conn * @param RedisException $e - * @throws JobQueueError + * @return JobQueueError */ - protected function throwRedisException( RedisConnRef $conn, $e ) { + protected function handleErrorAndMakeException( RedisConnRef $conn, $e ) { $this->redisPool->handleError( $conn, $e ); - throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); + return new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); } /** * @return string JSON */ private function encodeQueueName() { - return json_encode( [ $this->type, $this->wiki ] ); + return json_encode( [ $this->type, $this->domain ] ); } /** @@ -809,8 +815,9 @@ LUA; */ private function getQueueKey( $prop, $type = null ) { $type = is_string( $type ) ? $type : $this->type; - list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); - $keyspace = $prefix ? "$db-$prefix" : $db; + + // Use wiki ID for b/c + $keyspace = WikiMap::getWikiIdFromDbDomain( $this->domain ); $parts = [ $keyspace, 'jobqueue', $type, $prop ];