*
* @file
*/
+
+use MediaWiki\Logger\LoggerFactory;
use Psr\Log\LoggerInterface;
/**
parent::__construct( $params );
$params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
$this->server = $params['redisServer'];
- $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
+ $this->compression = $params['compression'] ?? 'none';
$this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
if ( empty( $params['daemonized'] ) ) {
throw new InvalidArgumentException(
"Non-daemonized mode is no longer supported. Please install the " .
"mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
}
- $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' );
+ $this->logger = LoggerFactory::getInstance( 'redis' );
}
protected function supportedOrders() {
try {
return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
}
return array_sum( $conn->exec() );
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
}
try {
return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
}
try {
return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
}
}
}
- if ( !count( $items ) ) {
+ if ( $items === [] ) {
return; // nothing to do
}
$failed += count( $itemBatch );
}
}
- JobQueue::incrStats( 'inserts', $this->type, count( $items ) );
- JobQueue::incrStats( 'inserts_actual', $this->type, $pushed );
- JobQueue::incrStats( 'dupe_inserts', $this->type,
+ $this->incrStats( 'inserts', $this->type, count( $items ) );
+ $this->incrStats( 'inserts_actual', $this->type, $pushed );
+ $this->incrStats( 'dupe_inserts', $this->type,
count( $items ) - $failed - $pushed );
if ( $failed > 0 ) {
$err = "Could not insert {$failed} {$this->type} job(s).";
throw new RedisException( $err );
}
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
}
/**
* @see JobQueue::doPop()
- * @return Job|bool
+ * @return RunnableJob|bool
* @throws JobQueueError
*/
protected function doPop() {
break; // no jobs; nothing to do
}
- JobQueue::incrStats( 'pops', $this->type );
+ $this->incrStats( 'pops', $this->type );
$item = $this->unserialize( $blob );
if ( $item === false ) {
wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
$job = $this->getJobFromFields( $item ); // may be false
} while ( !$job ); // job may be false if invalid
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
return $job;
/**
* @see JobQueue::doAck()
- * @param Job $job
- * @return Job|bool
+ * @param RunnableJob $job
+ * @return RunnableJob|bool
* @throws UnexpectedValueException
* @throws JobQueueError
*/
- protected function doAck( Job $job ) {
- if ( !isset( $job->metadata['uuid'] ) ) {
+ protected function doAck( RunnableJob $job ) {
+ $uuid = $job->getMetadata( 'uuid' );
+ if ( $uuid === null ) {
throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." );
}
- $uuid = $job->metadata['uuid'];
$conn = $this->getConnection();
try {
static $script =
return false;
}
- JobQueue::incrStats( 'acks', $this->type );
+ $this->incrStats( 'acks', $this->type );
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
return true;
// Update the timestamp of the last root job started at the location...
return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
}
/**
* @see JobQueue::doIsRootJobOldDuplicate()
- * @param Job $job
+ * @param IJobSpecification $job
* @return bool
* @throws JobQueueError
*/
- protected function doIsRootJobOldDuplicate( Job $job ) {
+ protected function doIsRootJobOldDuplicate( IJobSpecification $job ) {
if ( !$job->hasRootJobParams() ) {
return false; // job has no de-deplication info
}
// Get the last time this root job was enqueued
$timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
} catch ( RedisException $e ) {
- $timestamp = false;
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
// Check if a new root job was started at the location after this one's...
return $ok;
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
}
try {
$uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
return $this->getJobIterator( $conn, $uids );
try {
$uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
return $this->getJobIterator( $conn, $uids );
try {
$uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
return $this->getJobIterator( $conn, $uids );
try {
$uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
return $this->getJobIterator( $conn, $uids );
}
}
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
return $sizes;
* This function should not be called outside JobQueueRedis
*
* @param string $uid
- * @param RedisConnRef $conn
- * @return Job|bool Returns false if the job does not exist
+ * @param RedisConnRef|Redis $conn
+ * @return RunnableJob|bool Returns false if the job does not exist
* @throws JobQueueError
* @throws UnexpectedValueException
*/
- public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
+ public function getJobFromUidInternal( $uid, $conn ) {
try {
$data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
if ( $data === false ) {
}
$item = $this->unserialize( $data );
if ( !is_array( $item ) ) { // this shouldn't happen
- throw new UnexpectedValueException( "Could not find job with ID '$uid'." );
+ throw new UnexpectedValueException( "Could not unserialize job with ID '$uid'." );
}
- $title = Title::makeTitle( $item['namespace'], $item['title'] );
- $job = Job::factory( $item['type'], $title, $item['params'] );
- $job->metadata['uuid'] = $item['uuid'];
- $job->metadata['timestamp'] = $item['timestamp'];
+
+ $params = $item['params'];
+ $params += [ 'namespace' => $item['namespace'], 'title' => $item['title'] ];
+ $job = $this->factoryJob( $item['type'], $params );
+ $job->setMetadata( 'uuid', $item['uuid'] );
+ $job->setMetadata( 'timestamp', $item['timestamp'] );
// Add in attempt count for debugging at showJobs.php
- $job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid );
+ $job->setMetadata( 'attempts',
+ $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ) );
return $job;
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
}
$queues[] = $this->decodeQueueName( $queue );
}
} catch ( RedisException $e ) {
- $this->throwRedisException( $conn, $e );
+ throw $this->handleErrorAndMakeException( $conn, $e );
}
return $queues;
return [
// Fields that describe the nature of the job
'type' => $job->getType(),
- 'namespace' => $job->getTitle()->getNamespace(),
- 'title' => $job->getTitle()->getDBkey(),
+ 'namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
+ 'title' => $job->getParams()['title'] ?? '',
'params' => $job->getParams(),
// Some jobs cannot run until a "release timestamp"
'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
/**
* @param array $fields
- * @return Job|bool
+ * @return RunnableJob|bool
*/
protected function getJobFromFields( array $fields ) {
- $title = Title::makeTitle( $fields['namespace'], $fields['title'] );
- $job = Job::factory( $fields['type'], $title, $fields['params'] );
- $job->metadata['uuid'] = $fields['uuid'];
- $job->metadata['timestamp'] = $fields['timestamp'];
+ $params = $fields['params'];
+ $params += [ 'namespace' => $fields['namespace'], 'title' => $fields['title'] ];
+
+ $job = $this->factoryJob( $fields['type'], $params );
+ $job->setMetadata( 'uuid', $fields['uuid'] );
+ $job->setMetadata( 'timestamp', $fields['timestamp'] );
return $job;
}
/**
* Get a connection to the server that handles all sub-queues for this queue
*
- * @return RedisConnRef
+ * @return RedisConnRef|Redis
* @throws JobQueueConnectionError
*/
protected function getConnection() {
/**
* @param RedisConnRef $conn
* @param RedisException $e
- * @throws JobQueueError
+ * @return JobQueueError
*/
- protected function throwRedisException( RedisConnRef $conn, $e ) {
+ protected function handleErrorAndMakeException( RedisConnRef $conn, $e ) {
$this->redisPool->handleError( $conn, $e );
- throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
+ return new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
}
/**
* @return string JSON
*/
private function encodeQueueName() {
- return json_encode( [ $this->type, $this->wiki ] );
+ return json_encode( [ $this->type, $this->domain ] );
}
/**
*/
private function getQueueKey( $prop, $type = null ) {
$type = is_string( $type ) ? $type : $this->type;
- list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- $keyspace = $prefix ? "$db-$prefix" : $db;
+
+ // Use wiki ID for b/c
+ $keyspace = WikiMap::getWikiIdFromDbDomain( $this->domain );
$parts = [ $keyspace, 'jobqueue', $type, $prop ];