Merge "Made SSL validation in Curl HTTP requests the default."
[lhc/web/wiklou.git] / includes / job / JobQueueDB.php
index 223ef41..cbb2391 100644 (file)
  * @since 1.21
  */
 class JobQueueDB extends JobQueue {
-       const CACHE_TTL      = 300; // integer; seconds
-       const MAX_JOB_RANDOM = 2147483647; // 2^31 - 1; used for job_random
+       const CACHE_TTL      = 300; // integer; seconds to cache queue information
+       const MAX_AGE_PRUNE  = 604800; // integer; seconds a job can live once claimed
+       const MAX_ATTEMPTS   = 3; // integer; number of times to try a job
+       const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
 
        /**
         * @see JobQueue::doIsEmpty()
@@ -123,9 +125,13 @@ class JobQueueDB extends JobQueue {
                $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
                $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
                try {
+                       // Occasionally recycle jobs back into the queue that have been claimed too long
+                       if ( mt_rand( 0, 99 ) == 0 ) {
+                               $this->recycleStaleJobs();
+                       }
                        do { // retry when our row is invalid or deleted as a duplicate
                                // Try to reserve a row in the DB...
-                               if ( $this->order === 'timestamp' ) { // oldest first
+                               if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
                                        $row = $this->claimOldest( $uuid );
                                } else { // random first
                                        $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
@@ -199,7 +205,10 @@ class JobQueueDB extends JobQueue {
                        );
                        if ( $row ) { // claim the job
                                $dbw->update( 'job', // update by PK
-                                       array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+                                       array(
+                                               'job_token'           => $uuid,
+                                               'job_token_timestamp' => $dbw->timestamp(),
+                                               'job_attempts = job_attempts+1' ),
                                        array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
                                        __METHOD__
                                );
@@ -232,26 +241,30 @@ class JobQueueDB extends JobQueue {
                                // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
                                // Oracle and Postgre have no such limitation. However, MySQL offers an
                                // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
-                               $dbw->query( "UPDATE {$dbw->tableName( 'job' )}
-                                       SET
-                                               job_token = {$dbw->addQuotes( $uuid ) },
-                                               job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}
-                                       WHERE (
-                                               job_cmd = {$dbw->addQuotes( $this->type )}
-                                               AND job_token = {$dbw->addQuotes( '' )}
-                                       ) ORDER BY job_random ASC LIMIT 1",
+                               $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
+                                       "SET " .
+                                               "job_token = {$dbw->addQuotes( $uuid ) }, " .
+                                               "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
+                                               "job_attempts = job_attempts+1 " .
+                                       "WHERE ( " .
+                                               "job_cmd = {$dbw->addQuotes( $this->type )} " .
+                                               "AND job_token = {$dbw->addQuotes( '' )} " .
+                                       ") ORDER BY job_id ASC LIMIT 1",
                                        __METHOD__
                                );
                        } else {
                                // Use a subquery to find the job, within an UPDATE to claim it.
                                // This uses as much of the DB wrapper functions as possible.
                                $dbw->update( 'job',
-                                       array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+                                       array(
+                                               'job_token'           => $uuid,
+                                               'job_token_timestamp' => $dbw->timestamp(),
+                                               'job_attempts = job_attempts+1' ),
                                        array( 'job_id = (' .
                                                $dbw->selectSQLText( 'job', 'job_id',
                                                        array( 'job_cmd' => $this->type, 'job_token' => '' ),
                                                        __METHOD__,
-                                                       array( 'ORDER BY' => 'job_random ASC', 'LIMIT' => 1 ) ) .
+                                                       array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
                                                ')'
                                        ),
                                        __METHOD__
@@ -273,6 +286,51 @@ class JobQueueDB extends JobQueue {
                return $row;
        }
 
+       /**
+        * Recycle or destroy any jobs that have been claimed for too long
+        *
+        * @return integer Number of jobs recycled/deleted
+        */
+       protected function recycleStaleJobs() {
+               $now   = time();
+               $dbw   = $this->getMasterDB();
+               $count = 0; // affected rows
+
+               if ( $this->claimTTL > 0 ) { // re-try stale jobs...
+                       $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
+                       // Reset job_token for these jobs so that other runners will pick them up.
+                       // Set the timestamp to the current time, as it is useful to now that the job
+                       // was already tried before.
+                       $dbw->update( 'job',
+                               array(
+                                       'job_token' => '',
+                                       'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
+                               array(
+                                       'job_cmd' => $this->type,
+                                       "job_token != {$dbw->addQuotes( '' )}", // was acquired
+                                       "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
+                                       "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ),
+                               __METHOD__
+                       );
+                       $count += $dbw->affectedRows();
+               }
+
+               // Just destroy stale jobs...
+               $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
+               $conds = array(
+                       'job_cmd' => $this->type,
+                       "job_token != {$dbw->addQuotes( '' )}", // was acquired
+                       "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
+               );
+               if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
+                       $conds[] = "job_attempts >= {$dbw->addQuotes( self::MAX_ATTEMPTS )}";
+               }
+               $dbw->delete( 'job', $conds, __METHOD__ );
+               $count += $dbw->affectedRows();
+
+               return $count;
+       }
+
        /**
         * @see JobQueue::doAck()
         * @return Job|bool
@@ -330,17 +388,12 @@ class JobQueueDB extends JobQueue {
                        'job_params'    => self::makeBlob( $job->getParams() ),
                );
                // Additional job metadata
-               if ( $this->order === 'timestamp' ) { // oldest first
-                       $random = time() - 1325376000; // seconds since "January 1, 2012"
-               } else { // random first
-                       $random = mt_rand( 0, self::MAX_JOB_RANDOM );
-               }
                $dbw = $this->getMasterDB();
                $metaFields = array(
                        'job_id'        => $dbw->nextSequenceValue( 'job_job_id_seq' ),
                        'job_timestamp' => $dbw->timestamp(),
                        'job_sha1'      => wfBaseConvert( sha1( serialize( $descFields ) ), 16, 36, 32 ),
-                       'job_random'    => $random
+                       'job_random'    => mt_rand( 0, self::MAX_JOB_RANDOM )
                );
                return ( $descFields + $metaFields );
        }