Merge "Made SSL validation in Curl HTTP requests the default."
[lhc/web/wiklou.git] / includes / job / JobQueueDB.php
index bea4a6f..cbb2391 100644 (file)
  * Class to handle job queues stored in the DB
  *
  * @ingroup JobQueue
- * @since 1.20
+ * @since 1.21
  */
 class JobQueueDB extends JobQueue {
-       const CACHE_TTL      = 30; // integer; seconds
-       const MAX_JOB_RANDOM = 2147483647; // 2^31 - 1; used for job_random
+       const CACHE_TTL      = 300; // integer; seconds to cache queue information
+       const MAX_AGE_PRUNE  = 604800; // integer; seconds a job can live once claimed
+       const MAX_ATTEMPTS   = 3; // integer; number of times to try a job
+       const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
 
        /**
         * @see JobQueue::doIsEmpty()
@@ -97,7 +99,7 @@ class JobQueueDB extends JobQueue {
                                        $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore automatic begin()
                                }
 
-                               $wgMemc->set( $key, 'false', $ttl );
+                               $wgMemc->set( $key, 'false', $ttl ); // queue is not empty
                        } );
                }
 
@@ -111,48 +113,57 @@ class JobQueueDB extends JobQueue {
        protected function doPop() {
                global $wgMemc;
 
-               $uuid = wfRandomString( 32 ); // pop attempt
+               if ( $wgMemc->get( $this->getEmptinessCacheKey() ) === 'true' ) {
+                       return false; // queue is empty
+               }
 
                $dbw = $this->getMasterDB();
                $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
 
+               $uuid = wfRandomString( 32 ); // pop attempt
                $job = false; // job popped off
                $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
                $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
                try {
+                       // Occasionally recycle jobs back into the queue that have been claimed too long
+                       if ( mt_rand( 0, 99 ) == 0 ) {
+                               $this->recycleStaleJobs();
+                       }
                        do { // retry when our row is invalid or deleted as a duplicate
-                               $row = false; // row claimed
-                               $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
-                               $gte  = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
-                               // Try to reserve a DB row...
-                               if ( $this->claim( $uuid, $rand, $gte ) || $this->claim( $uuid, $rand, !$gte ) ) {
-                                       // Fetch any row that we just reserved...
-                                       $row = $dbw->selectRow( 'job', '*',
-                                               array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__ );
-                                       // Check if another process deleted it as a duplicate
-                                       if ( !$row ) {
-                                               wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
-                                               continue; // try again
-                                       }
-                                       // Get the job object from the row...
-                                       $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
-                                       if ( !$title ) {
-                                               $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
-                                               wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
-                                               continue; // try again
+                               // Try to reserve a row in the DB...
+                               if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
+                                       $row = $this->claimOldest( $uuid );
+                               } else { // random first
+                                       $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+                                       $gte  = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+                                       $row  = $this->claimRandom( $uuid, $rand, $gte );
+                                       if ( !$row ) { // need to try the other direction
+                                               $row = $this->claimRandom( $uuid, $rand, !$gte );
                                        }
-                                       $job = Job::factory( $row->job_cmd, $title,
-                                               self::extractBlob( $row->job_params ), $row->job_id );
-                                       // Delete any *other* duplicate jobs in the queue...
-                                       if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
-                                               $dbw->delete( 'job',
-                                                       array( 'job_sha1' => $row->job_sha1,
-                                                               "job_id != {$dbw->addQuotes( $row->job_id )}" ),
-                                                       __METHOD__
-                                               );
-                                       }
-                               } else {
+                               }
+                               // Check if we found a row to reserve...
+                               if ( !$row ) {
                                        $wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL );
+                                       break; // nothing to do
+                               }
+                               // Get the job object from the row...
+                               $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
+                               if ( !$title ) {
+                                       $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
+                                       wfIncrStats( 'job-pop' );
+                                       wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
+                                       continue; // try again
+                               }
+                               $job = Job::factory( $row->job_cmd, $title,
+                                       self::extractBlob( $row->job_params ), $row->job_id );
+                               // Delete any *other* duplicate jobs in the queue...
+                               if ( $job->ignoreDuplicates() && strlen( $row->job_sha1 ) ) {
+                                       $dbw->delete( 'job',
+                                               array( 'job_sha1' => $row->job_sha1,
+                                                       "job_id != {$dbw->addQuotes( $row->job_id )}" ),
+                                               __METHOD__
+                                       );
+                                       wfIncrStats( 'job-pop', $dbw->affectedRows() );
                                }
                                break; // done
                        } while( true );
@@ -167,51 +178,157 @@ class JobQueueDB extends JobQueue {
 
        /**
         * Reserve a row with a single UPDATE without holding row locks over RTTs...
+        *
         * @param $uuid string 32 char hex string
         * @param $rand integer Random unsigned integer (31 bits)
         * @param $gte bool Search for job_random >= $random (otherwise job_random <= $random)
-        * @return integer Number of affected rows
+        * @return Row|false
         */
-       protected function claim( $uuid, $rand, $gte ) {
+       protected function claimRandom( $uuid, $rand, $gte ) {
                $dbw  = $this->getMasterDB();
                $dir  = $gte ? 'ASC' : 'DESC';
                $ineq = $gte ? '>=' : '<=';
-               if ( $dbw->getType() === 'mysql' ) {
-                       // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
-                       // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
-                       // Oracle and Postgre have no such limitation. However, MySQL offers an
-                       // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
-                       // The DB wrapper functions do not support this, so it's done manually.
-                       $dbw->query( "UPDATE {$dbw->tableName( 'job' )}
-                               SET
-                                       job_token = {$dbw->addQuotes( $uuid ) },
-                                       job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}
-                               WHERE (
-                                       job_cmd = {$dbw->addQuotes( $this->type )}
-                                       AND job_token = {$dbw->addQuotes( '' )}
-                                       AND job_random {$ineq} {$dbw->addQuotes( $rand )}
-                               ) ORDER BY job_random {$dir} LIMIT 1",
-                               __METHOD__
+
+               $row = false; // the row acquired
+               // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
+               // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
+               // not replication safe. Due to http://bugs.mysql.com/bug.php?id=6980, subqueries cannot
+               // be used here with MySQL.
+               do {
+                       $row = $dbw->selectRow( 'job', '*', // find a random job
+                               array(
+                                       'job_cmd'   => $this->type,
+                                       'job_token' => '',
+                                       "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
+                               __METHOD__,
+                               array( 'ORDER BY' => "job_random {$dir}" )
                        );
-               } else {
-                       // Use a subquery to find the job, within an UPDATE to claim it.
-                       // This uses as much of the DB wrapper functions as possible.
+                       if ( $row ) { // claim the job
+                               $dbw->update( 'job', // update by PK
+                                       array(
+                                               'job_token'           => $uuid,
+                                               'job_token_timestamp' => $dbw->timestamp(),
+                                               'job_attempts = job_attempts+1' ),
+                                       array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
+                                       __METHOD__
+                               );
+                               // This might get raced out by another runner when claiming the previously
+                               // selected row. The use of job_random should minimize this problem, however.
+                               if ( !$dbw->affectedRows() ) {
+                                       $row = false; // raced out
+                               }
+                       } else {
+                               break; // nothing to do
+                       }
+               } while ( !$row );
+
+               return $row;
+       }
+
+       /**
+        * Reserve a row with a single UPDATE without holding row locks over RTTs...
+        *
+        * @param $uuid string 32 char hex string
+        * @return Row|false
+        */
+       protected function claimOldest( $uuid ) {
+               $dbw  = $this->getMasterDB();
+
+               $row = false; // the row acquired
+               do {
+                       if ( $dbw->getType() === 'mysql' ) {
+                               // Per http://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
+                               // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
+                               // Oracle and Postgre have no such limitation. However, MySQL offers an
+                               // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
+                               $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
+                                       "SET " .
+                                               "job_token = {$dbw->addQuotes( $uuid ) }, " .
+                                               "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
+                                               "job_attempts = job_attempts+1 " .
+                                       "WHERE ( " .
+                                               "job_cmd = {$dbw->addQuotes( $this->type )} " .
+                                               "AND job_token = {$dbw->addQuotes( '' )} " .
+                                       ") ORDER BY job_id ASC LIMIT 1",
+                                       __METHOD__
+                               );
+                       } else {
+                               // Use a subquery to find the job, within an UPDATE to claim it.
+                               // This uses as much of the DB wrapper functions as possible.
+                               $dbw->update( 'job',
+                                       array(
+                                               'job_token'           => $uuid,
+                                               'job_token_timestamp' => $dbw->timestamp(),
+                                               'job_attempts = job_attempts+1' ),
+                                       array( 'job_id = (' .
+                                               $dbw->selectSQLText( 'job', 'job_id',
+                                                       array( 'job_cmd' => $this->type, 'job_token' => '' ),
+                                                       __METHOD__,
+                                                       array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
+                                               ')'
+                                       ),
+                                       __METHOD__
+                               );
+                       }
+                       // Fetch any row that we just reserved...
+                       if ( $dbw->affectedRows() ) {
+                               $row = $dbw->selectRow( 'job', '*',
+                                       array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__
+                               );
+                               if ( !$row ) { // raced out by duplicate job removal
+                                       wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
+                               }
+                       } else {
+                               break; // nothing to do
+                       }
+               } while ( !$row );
+
+               return $row;
+       }
+
+       /**
+        * Recycle or destroy any jobs that have been claimed for too long
+        *
+        * @return integer Number of jobs recycled/deleted
+        */
+       protected function recycleStaleJobs() {
+               $now   = time();
+               $dbw   = $this->getMasterDB();
+               $count = 0; // affected rows
+
+               if ( $this->claimTTL > 0 ) { // re-try stale jobs...
+                       $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
+                       // Reset job_token for these jobs so that other runners will pick them up.
+                       // Set the timestamp to the current time, as it is useful to now that the job
+                       // was already tried before.
                        $dbw->update( 'job',
-                               array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
-                               array( 'job_id = (' .
-                                       $dbw->selectSQLText( 'job', 'job_id',
-                                               array(
-                                                       'job_cmd'   => $this->type,
-                                                       'job_token' => '',
-                                                       "job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
-                                               __METHOD__,
-                                               array( 'ORDER BY' => "job_random {$dir}", 'LIMIT' => 1 ) ) .
-                                       ')'
-                               ),
+                               array(
+                                       'job_token' => '',
+                                       'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
+                               array(
+                                       'job_cmd' => $this->type,
+                                       "job_token != {$dbw->addQuotes( '' )}", // was acquired
+                                       "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
+                                       "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ),
                                __METHOD__
                        );
+                       $count += $dbw->affectedRows();
                }
-               return $dbw->affectedRows();
+
+               // Just destroy stale jobs...
+               $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
+               $conds = array(
+                       'job_cmd' => $this->type,
+                       "job_token != {$dbw->addQuotes( '' )}", // was acquired
+                       "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
+               );
+               if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
+                       $conds[] = "job_attempts >= {$dbw->addQuotes( self::MAX_ATTEMPTS )}";
+               }
+               $dbw->delete( 'job', $conds, __METHOD__ );
+               $count += $dbw->affectedRows();
+
+               return $count;
        }
 
        /**
@@ -220,10 +337,7 @@ class JobQueueDB extends JobQueue {
         */
        protected function doAck( Job $job ) {
                $dbw = $this->getMasterDB();
-               if ( $dbw->trxLevel() ) {
-                       wfWarn( "Attempted to ack a job in a transaction; committing first." );
-                       $dbw->commit(); // push existing transaction
-               }
+               $dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
 
                $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
                $dbw->clearFlag( DBO_TRX ); // make each query its own transaction