Merge "Reset scoped session for upload jobs after deferred updates"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Tue, 1 Mar 2016 11:30:55 +0000 (11:30 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Tue, 1 Mar 2016 11:30:55 +0000 (11:30 +0000)
1  2 
includes/jobqueue/Job.php
includes/jobqueue/JobRunner.php
includes/jobqueue/jobs/AssembleUploadChunksJob.php
includes/jobqueue/jobs/PublishStashedFileJob.php

@@@ -36,7 -36,7 +36,7 @@@ abstract class Job implements IJobSpeci
        public $params;
  
        /** @var array Additional queue metadata */
 -      public $metadata = array();
 +      public $metadata = [];
  
        /** @var Title */
        protected $title;
@@@ -47,6 -47,9 +47,9 @@@
        /** @var string Text for error that occurred last */
        protected $error;
  
+       /** @var callable[] */
+       protected $teardownCallbacks = [];
        /**
         * Run the job
         * @return bool Success
@@@ -62,7 -65,7 +65,7 @@@
         * @throws MWException
         * @return Job
         */
 -      public static function factory( $command, Title $title, $params = array() ) {
 +      public static function factory( $command, Title $title, $params = [] ) {
                global $wgJobClasses;
  
                if ( isset( $wgJobClasses[$command] ) ) {
@@@ -85,7 -88,7 +88,7 @@@
        public function __construct( $command, $title, $params = false ) {
                $this->command = $command;
                $this->title = $title;
 -              $this->params = is_array( $params ) ? $params : array(); // sanity
 +              $this->params = is_array( $params ) ? $params : []; // sanity
  
                // expensive jobs may set this to true
                $this->removeDuplicates = false;
         * @since 1.21
         */
        public function getDeduplicationInfo() {
 -              $info = array(
 +              $info = [
                        'type' => $this->getType(),
                        'namespace' => $this->getTitle()->getNamespace(),
                        'title' => $this->getTitle()->getDBkey(),
                        'params' => $this->getParams()
 -              );
 +              ];
                if ( is_array( $info['params'] ) ) {
                        // Identical jobs with different "root" jobs should count as duplicates
                        unset( $info['params']['rootJobSignature'] );
         * @since 1.21
         */
        public static function newRootJobParams( $key ) {
 -              return array(
 +              return [
                        'rootJobIsSelf'    => true,
                        'rootJobSignature' => sha1( $key ),
                        'rootJobTimestamp' => wfTimestampNow()
 -              );
 +              ];
        }
  
        /**
         * @since 1.21
         */
        public function getRootJobParams() {
 -              return array(
 +              return [
                        'rootJobSignature' => isset( $this->params['rootJobSignature'] )
                                ? $this->params['rootJobSignature']
                                : null,
                        'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] )
                                ? $this->params['rootJobTimestamp']
                                : null
 -              );
 +              ];
        }
  
        /**
                return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] );
        }
  
+       /**
+        * @param callable $callback
+        * @since 1.27
+        */
+       protected function addTeardownCallback( $callback ) {
+               $this->teardownCallbacks[] = $callback;
+       }
+       /**
+        * Do any final cleanup after run(), deferred updates, and all DB commits happen
+        *
+        * @since 1.27
+        */
+       public function teardown() {
+               foreach ( $this->teardownCallbacks as $callback ) {
+                       call_user_func( $callback );
+               }
+       }
        /**
         * Insert a single job into the queue.
         * @return bool True on success
                                        $paramString .= ' ';
                                }
                                if ( is_array( $value ) ) {
 -                                      $filteredValue = array();
 +                                      $filteredValue = [];
                                        foreach ( $value as $k => $v ) {
                                                if ( is_scalar( $v ) ) {
                                                        $filteredValue[$k] = $truncFunc( $v );
@@@ -80,8 -80,7 +80,8 @@@ class JobRunner implements LoggerAwareI
         * The response map also has:
         *   - backoffs : the (job type => seconds) map of backoff times
         *   - elapsed  : the total time spent running tasks in ms
 -       *   - reached  : the reason the script finished, one of (none-ready, job-limit, time-limit)
 +       *   - reached  : the reason the script finished, one of (none-ready, job-limit, time-limit,
 +       *  memory-limit)
         *
         * This method outputs status information only if a debug handler was set.
         * Any exceptions are caught and logged, but are not reported as output.
@@@ -96,7 -95,7 +96,7 @@@
        public function run( array $options ) {
                global $wgJobClasses, $wgTrxProfilerLimits;
  
 -              $response = array( 'jobs' => array(), 'reached' => 'none-ready' );
 +              $response = [ 'jobs' => [], 'reached' => 'none-ready' ];
  
                $type = isset( $options['type'] ) ? $options['type'] : false;
                $maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : false;
                $trxProfiler->setExpectations( $wgTrxProfilerLimits['JobRunner'], __METHOD__ );
  
                // Some jobs types should not run until a certain timestamp
 -              $backoffs = array(); // map of (type => UNIX expiry)
 -              $backoffDeltas = array(); // map of (type => seconds)
 +              $backoffs = []; // map of (type => UNIX expiry)
 +              $backoffDeltas = []; // map of (type => seconds)
                $wait = 'wait'; // block to read backoffs the first time
  
                $group = JobQueueGroup::singleton();
                do {
                        // Sync the persistent backoffs with concurrent runners
                        $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
 -                      $blacklist = $noThrottle ? array() : array_keys( $backoffs );
 +                      $blacklist = $noThrottle ? [] : array_keys( $backoffs );
                        $wait = 'nowait'; // less important now
  
                        if ( $type === false ) {
                                                : $ttw;
                                }
  
 -                              $response['jobs'][] = array(
 +                              $response['jobs'][] = [
                                        'type'   => $jType,
                                        'status' => ( $info['status'] === false ) ? 'failed' : 'ok',
                                        'error'  => $info['error'],
                                        'time'   => $info['timeMs']
 -                              );
 +                              ];
                                $timeMsTotal += $info['timeMs'];
  
                                // Break out if we hit the job count or wall time limits...
                                $timePassed = microtime( true ) - $lastCheckTime;
                                if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
                                        try {
 -                                              wfGetLBFactory()->waitForReplication( array(
 +                                              wfGetLBFactory()->waitForReplication( [
                                                        'ifWritesSince' => $lastCheckTime,
                                                        'timeout' => self::MAX_ALLOWED_LAG
 -                                              ) );
 +                                              ] );
                                        } catch ( DBReplicationWaitError $e ) {
                                                $response['reached'] = 'slave-lag-limit';
                                                break;
  
                        DeferredUpdates::doUpdates();
                        $this->commitMasterChanges( $job );
+                       $job->teardown();
                } catch ( Exception $e ) {
                        MWExceptionHandler::rollbackMasterChangesAndLog( $e );
                        $status = false;
                        $stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
                }
                // Record root job age for jobs being run
 -              $root = $job->getRootJobParams();
 -              if ( $root['rootJobTimestamp'] ) {
 -                      $age = max( 0, $popTime - wfTimestamp( TS_UNIX, $root['rootJobTimestamp'] ) );
 +              $rootTimestamp = $job->getRootJobParams()['rootJobTimestamp'];
 +              if ( $rootTimestamp ) {
 +                      $age = max( 0, $popTime - wfTimestamp( TS_UNIX, $rootTimestamp ) );
                        $stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age );
                }
                // Track the execution time for jobs
                        $this->debugCallback( $msg );
                }
  
 -              return array( 'status' => $status, 'error' => $error, 'timeMs' => $timeMs );
 +              return [ 'status' => $status, 'error' => $error, 'timeMs' => $timeMs ];
        }
  
        /**
         * @return int|null Max memory RSS in kilobytes
         */
        private function getMaxRssKb() {
 -              $info = wfGetRusage() ?: array();
 +              $info = wfGetRusage() ?: [];
                // see http://linux.die.net/man/2/getrusage
                return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null;
        }
                        flock( $handle, LOCK_UN );
                        fclose( $handle );
                        $ctime = microtime( true );
 -                      $cBackoffs = json_decode( $content, true ) ?: array();
 +                      $cBackoffs = json_decode( $content, true ) ?: [];
                        foreach ( $cBackoffs as $type => $timestamp ) {
                                if ( $timestamp < $ctime ) {
                                        unset( $cBackoffs[$type] );
                                }
                        }
                } else {
 -                      $cBackoffs = array();
 +                      $cBackoffs = [];
                }
  
                return $cBackoffs;
                }
                $ctime = microtime( true );
                $content = stream_get_contents( $handle );
 -              $cBackoffs = json_decode( $content, true ) ?: array();
 +              $cBackoffs = json_decode( $content, true ) ?: [];
                foreach ( $deltas as $type => $seconds ) {
                        $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
                                ? $cBackoffs[$type] + $seconds
                flock( $handle, LOCK_UN );
                fclose( $handle );
  
 -              $deltas = array();
 +              $deltas = [];
  
                return $cBackoffs;
        }
        private function checkMemoryOK() {
                static $maxBytes = null;
                if ( $maxBytes === null ) {
 -                      $m = array();
 +                      $m = [];
                        if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
                                list( , $num, $unit ) = $m;
 -                              $conv = array( 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 );
 +                              $conv = [ 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ];
                                $maxBytes = $num * $conv[strtolower( $unit )];
                        } else {
                                $maxBytes = 0;
         */
        private function debugCallback( $msg ) {
                if ( $this->debug ) {
 -                      call_user_func_array( $this->debug, array( wfTimestamp( TS_DB ) . " $msg\n" ) );
 +                      call_user_func_array( $this->debug, [ wfTimestamp( TS_DB ) . " $msg\n" ] );
                }
        }
  
@@@ -35,6 -35,10 +35,10 @@@ class AssembleUploadChunksJob extends J
        public function run() {
                /** @noinspection PhpUnusedLocalVariableInspection */
                $scope = RequestContext::importScopedSession( $this->params['session'] );
+               $this->addTeardownCallback( function () use ( &$scope ) {
+                       ScopedCallback::consume( $scope ); // T126450
+               } );
                $context = RequestContext::getMain();
                $user = $context->getUser();
                try {
@@@ -47,7 -51,7 +51,7 @@@
                        UploadBase::setSessionStatus(
                                $user,
                                $this->params['filekey'],
 -                              array( 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() )
 +                              [ 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() ]
                        );
  
                        $upload = new UploadFromChunks( $user );
@@@ -63,7 -67,7 +67,7 @@@
                                UploadBase::setSessionStatus(
                                        $user,
                                        $this->params['filekey'],
 -                                      array( 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status )
 +                                      [ 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status ]
                                );
                                $this->setLastError( $status->getWikiText() );
  
                        UploadBase::setSessionStatus(
                                $user,
                                $this->params['filekey'],
 -                              array(
 +                              [
                                        'result' => 'Success',
                                        'stage' => 'assembling',
                                        'filekey' => $newFileKey,
                                        'imageinfo' => $imageInfo,
                                        'status' => Status::newGood()
 -                              )
 +                              ]
                        );
                } catch ( Exception $e ) {
                        UploadBase::setSessionStatus(
                                $user,
                                $this->params['filekey'],
 -                              array(
 +                              [
                                        'result' => 'Failure',
                                        'stage' => 'assembling',
                                        'status' => Status::newFatal( 'api-error-stashfailed' )
 -                              )
 +                              ]
                        );
                        $this->setLastError( get_class( $e ) . ": " . $e->getMessage() );
                        // To be extra robust.
        public function getDeduplicationInfo() {
                $info = parent::getDeduplicationInfo();
                if ( is_array( $info['params'] ) ) {
 -                      $info['params'] = array( 'filekey' => $info['params']['filekey'] );
 +                      $info['params'] = [ 'filekey' => $info['params']['filekey'] ];
                }
  
                return $info;
@@@ -37,6 -37,10 +37,10 @@@ class PublishStashedFileJob extends Jo
        public function run() {
                /** @noinspection PhpUnusedLocalVariableInspection */
                $scope = RequestContext::importScopedSession( $this->params['session'] );
+               $this->addTeardownCallback( function () use ( &$scope ) {
+                       ScopedCallback::consume( $scope ); // T126450
+               } );
                $context = RequestContext::getMain();
                $user = $context->getUser();
                try {
@@@ -49,7 -53,7 +53,7 @@@
                        UploadBase::setSessionStatus(
                                $user,
                                $this->params['filekey'],
 -                              array( 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() )
 +                              [ 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() ]
                        );
  
                        $upload = new UploadFromStash( $user );
                        $verification = $upload->verifyUpload();
                        if ( $verification['status'] !== UploadBase::OK ) {
                                $status = Status::newFatal( 'verification-error' );
 -                              $status->value = array( 'verification' => $verification );
 +                              $status->value = [ 'verification' => $verification ];
                                UploadBase::setSessionStatus(
                                        $user,
                                        $this->params['filekey'],
 -                                      array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
 +                                      [ 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ]
                                );
                                $this->setLastError( "Could not verify upload." );
  
                                $this->params['text'],
                                $this->params['watch'],
                                $user,
 -                              isset( $this->params['tags'] ) ? $this->params['tags'] : array()
 +                              isset( $this->params['tags'] ) ? $this->params['tags'] : []
                        );
                        if ( !$status->isGood() ) {
                                UploadBase::setSessionStatus(
                                        $user,
                                        $this->params['filekey'],
 -                                      array( 'result' => 'Failure', 'stage' => 'publish', 'status' => $status )
 +                                      [ 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ]
                                );
                                $this->setLastError( $status->getWikiText() );
  
                        UploadBase::setSessionStatus(
                                $user,
                                $this->params['filekey'],
 -                              array(
 +                              [
                                        'result' => 'Success',
                                        'stage' => 'publish',
                                        'filename' => $upload->getLocalFile()->getName(),
                                        'imageinfo' => $imageInfo,
                                        'status' => Status::newGood()
 -                              )
 +                              ]
                        );
                } catch ( Exception $e ) {
                        UploadBase::setSessionStatus(
                                $user,
                                $this->params['filekey'],
 -                              array(
 +                              [
                                        'result' => 'Failure',
                                        'stage' => 'publish',
                                        'status' => Status::newFatal( 'api-error-publishfailed' )
 -                              )
 +                              ]
                        );
                        $this->setLastError( get_class( $e ) . ": " . $e->getMessage() );
                        // To prevent potential database referential integrity issues.
        public function getDeduplicationInfo() {
                $info = parent::getDeduplicationInfo();
                if ( is_array( $info['params'] ) ) {
 -                      $info['params'] = array( 'filekey' => $info['params']['filekey'] );
 +                      $info['params'] = [ 'filekey' => $info['params']['filekey'] ];
                }
  
                return $info;