Merge "jobqueue: migrate root job deduplication to the WAN cache"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Sat, 13 Jul 2019 23:30:56 +0000 (23:30 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Sat, 13 Jul 2019 23:30:56 +0000 (23:30 +0000)
1  2 
includes/jobqueue/JobQueueRedis.php

@@@ -19,8 -19,6 +19,8 @@@
   *
   * @file
   */
 +
 +use MediaWiki\Logger\LoggerFactory;
  use Psr\Log\LoggerInterface;
  
  /**
@@@ -102,7 -100,7 +102,7 @@@ class JobQueueRedis extends JobQueue 
                                "Non-daemonized mode is no longer supported. Please install the " .
                                "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
                }
 -              $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' );
 +              $this->logger = LoggerFactory::getInstance( 'redis' );
        }
  
        protected function supportedOrders() {
                try {
                        return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
        }
  
  
                        return array_sum( $conn->exec() );
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
        }
  
                try {
                        return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
        }
  
                try {
                        return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
        }
  
                                throw new RedisException( $err );
                        }
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
        }
  
@@@ -334,7 -332,7 +334,7 @@@ LUA
                                $job = $this->getJobFromFields( $item ); // may be false
                        } while ( !$job ); // job may be false if invalid
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
  
                return $job;
@@@ -428,7 -426,7 +428,7 @@@ LUA
  
                        $this->incrStats( 'acks', $this->type );
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
  
                return true;
  
                $conn = $this->getConnection();
                try {
-                       $timestamp = $conn->get( $key ); // current last timestamp of this job
+                       $timestamp = $conn->get( $key ); // last known timestamp of such a root job
                        if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
                                return true; // a newer version of this root job was enqueued
                        }
                        // Update the timestamp of the last root job started at the location...
                        return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
        }
  
                        // Get the last time this root job was enqueued
                        $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
                } catch ( RedisException $e ) {
 -                      $timestamp = false;
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
  
                // Check if a new root job was started at the location after this one's...
  
                        return $ok;
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
        }
  
                try {
                        $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
  
                return $this->getJobIterator( $conn, $uids );
                try {
                        $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
  
                return $this->getJobIterator( $conn, $uids );
                try {
                        $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
  
                return $this->getJobIterator( $conn, $uids );
                try {
                        $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
  
                return $this->getJobIterator( $conn, $uids );
                                }
                        }
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
  
                return $sizes;
         * This function should not be called outside JobQueueRedis
         *
         * @param string $uid
 -       * @param RedisConnRef $conn
 +       * @param RedisConnRef|Redis $conn
         * @return RunnableJob|bool Returns false if the job does not exist
         * @throws JobQueueError
         * @throws UnexpectedValueException
         */
 -      public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
 +      public function getJobFromUidInternal( $uid, $conn ) {
                try {
                        $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
                        if ( $data === false ) {
  
                        return $job;
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
        }
  
                                $queues[] = $this->decodeQueueName( $queue );
                        }
                } catch ( RedisException $e ) {
 -                      $this->throwRedisException( $conn, $e );
 +                      throw $this->handleErrorAndMakeException( $conn, $e );
                }
  
                return $queues;
        /**
         * Get a connection to the server that handles all sub-queues for this queue
         *
 -       * @return RedisConnRef
 +       * @return RedisConnRef|Redis
         * @throws JobQueueConnectionError
         */
        protected function getConnection() {
        /**
         * @param RedisConnRef $conn
         * @param RedisException $e
 -       * @throws JobQueueError
 +       * @return JobQueueError
         */
 -      protected function throwRedisException( RedisConnRef $conn, $e ) {
 +      protected function handleErrorAndMakeException( RedisConnRef $conn, $e ) {
                $this->redisPool->handleError( $conn, $e );
 -              throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
 +              return new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
        }
  
        /**