servers = isset( $params['redisServers'] ) ? $params['redisServers'] : [ $params['redisServer'] ]; // b/c $params['redisConfig']['serializer'] = 'none'; $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' ); } protected function doNotifyQueueEmpty( $wiki, $type ) { return true; // managed by the service } protected function doNotifyQueueNonEmpty( $wiki, $type ) { return true; // managed by the service } protected function doGetAllReadyWikiQueues() { $conn = $this->getConnection(); if ( !$conn ) { return []; } try { $map = $conn->hGetAll( $this->getReadyQueueKey() ); if ( is_array( $map ) && isset( $map['_epoch'] ) ) { unset( $map['_epoch'] ); // ignore $pendingDBs = []; // (type => list of wikis) foreach ( $map as $key => $time ) { list( $type, $wiki ) = $this->decodeQueueName( $key ); $pendingDBs[$type][] = $wiki; } } else { throw new UnexpectedValueException( "No queue listing found; make sure redisJobChronService is running." ); } return $pendingDBs; } catch ( RedisException $e ) { $this->redisPool->handleError( $conn, $e ); return []; } } protected function doPurge() { return true; // fully and only refreshed by the service } /** * Get a connection to the server that handles all sub-queues for this queue * * @return RedisConnRef|bool Returns false on failure * @throws MWException */ protected function getConnection() { $conn = false; foreach ( $this->servers as $server ) { $conn = $this->redisPool->getConnection( $server, $this->logger ); if ( $conn ) { break; } } return $conn; } /** * @return string */ private function getReadyQueueKey() { return "jobqueue:aggregator:h-ready-queues:v2"; // global } /** * @param string $name * @return string */ private function decodeQueueName( $name ) { list( $type, $wiki ) = explode( '/', $name, 2 ); return [ rawurldecode( $type ), rawurldecode( $wiki ) ]; } }