[JobQueue] Added support for retrying jobs when runners die.
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 24 Oct 2012 00:29:54 +0000 (17:29 -0700)
committerGerrit Code Review <gerrit@wikimedia.org>
Wed, 14 Nov 2012 01:25:03 +0000 (01:25 +0000)
* This adds a new 'claimTTL' setting which recycles jobs into the queue
  if they have been claimed for longer than that duration (in seconds).
* This also purges jobs for such cases when job retries are not enabled.
* This adds a new job_attempts column and adds an index to improve the
  query and semantics for FIFO ordered queues.

Change-Id: Idb6217a694d86a4d6fc881768deed424628f015d

includes/installer/MysqlUpdater.php
includes/installer/SqliteUpdater.php
includes/job/JobQueue.php
includes/job/JobQueueDB.php
maintenance/archives/patch-job_attempts.sql [new file with mode: 0644]
maintenance/tables.sql

index 3f1dad9..7cf382b 100644 (file)
@@ -225,6 +225,7 @@ class MysqlUpdater extends DatabaseUpdater {
                        array( 'addTable', 'sites',                            'patch-sites.sql' ),
                        array( 'addField', 'filearchive',   'fa_sha1',          'patch-fa_sha1.sql' ),
                        array( 'addField', 'job',           'job_token',         'patch-job_token.sql' ),
+                       array( 'addField', 'job',           'job_attempts',       'patch-job_attempts.sql' ),
                );
        }
 
index 2fa3f31..3921def 100644 (file)
@@ -105,6 +105,7 @@ class SqliteUpdater extends DatabaseUpdater {
                        array( 'addTable', 'sites',                            'patch-sites.sql' ),
                        array( 'addField', 'filearchive',   'fa_sha1',          'patch-fa_sha1.sql' ),
                        array( 'addField', 'job',           'job_token',         'patch-job_token.sql' ),
+                       array( 'addField', 'job',           'job_attempts',      'patch-job_attempts.sql' ),
                );
        }
 
index 21ef6d3..aa3d6e2 100644 (file)
@@ -32,6 +32,7 @@ abstract class JobQueue {
        protected $wiki; // string; wiki ID
        protected $type; // string; job type
        protected $order; // string; job priority for pop()
+       protected $claimTTL; // integer; seconds
 
        const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions
 
@@ -39,23 +40,28 @@ abstract class JobQueue {
         * @param $params array
         */
        protected function __construct( array $params ) {
-               $this->wiki  = $params['wiki'];
-               $this->type  = $params['type'];
-               $this->order = isset( $params['order'] ) ? $params['order'] : 'random';
+               $this->wiki     = $params['wiki'];
+               $this->type     = $params['type'];
+               $this->order    = isset( $params['order'] ) ? $params['order'] : 'random';
+               $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
        }
 
        /**
         * Get a job queue object of the specified type.
         * $params includes:
-        *     class : What job class to use (determines job type)
-        *     wiki  : wiki ID of the wiki the jobs are for (defaults to current wiki)
-        *     type  : The name of the job types this queue handles
-        *     order : Order that pop() selects jobs, either "timestamp" or "random".
-        *             If "timestamp" is used, the queue will effectively be FIFO. Note that
-        *             pop() will not be exactly FIFO, and even if it was, job completion would
-        *             not appear to be exactly FIFO since jobs can take different times to finish.
-        *             If "random" is used, pop() will pick jobs in random order. This might be
-        *             useful for improving concurrency depending on the queue storage medium.
+        *   class    : What job class to use (determines job type)
+        *   wiki     : wiki ID of the wiki the jobs are for (defaults to current wiki)
+        *   type     : The name of the job types this queue handles
+        *   order    : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
+        *              If "fifo" is used, the queue will effectively be FIFO. Note that
+        *              job completion will not appear to be exactly FIFO if there are multiple
+        *              job runners since jobs can take different times to finish once popped.
+        *              If "timestamp" is used, the queue will at least be loosely ordered
+        *              by timestamp, allowing for some jobs to be popped off out of order.
+        *              If "random" is used, pop() will pick jobs in random order. This might be
+        *              useful for improving concurrency depending on the queue storage medium.
+        *   claimTTL : If supported, the queue will recycle jobs that have been popped
+        *              but not acknowledged as completed after this many seconds.
         *
         * @param $params array
         * @return JobQueue
index 223ef41..0ce4296 100644 (file)
  * @since 1.21
  */
 class JobQueueDB extends JobQueue {
-       const CACHE_TTL      = 300; // integer; seconds
-       const MAX_JOB_RANDOM = 2147483647; // 2^31 - 1; used for job_random
+       const CACHE_TTL      = 300; // integer; seconds to cache queue information
+       const MAX_AGE_PRUNE  = 604800; // integer; seconds a job can live once claimed
+       const MAX_ATTEMPTS   = 3; // integer; number of times to try a job
+       const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
 
        /**
         * @see JobQueue::doIsEmpty()
@@ -123,9 +125,13 @@ class JobQueueDB extends JobQueue {
                $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
                $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
                try {
+                       // Occasionally recycle jobs back into the queue that have been claimed too long
+                       if ( mt_rand( 0, 99 ) == 0 ) {
+                               $this->recycleStaleJobs();
+                       }
                        do { // retry when our row is invalid or deleted as a duplicate
                                // Try to reserve a row in the DB...
-                               if ( $this->order === 'timestamp' ) { // oldest first
+                               if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
                                        $row = $this->claimOldest( $uuid );
                                } else { // random first
                                        $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
@@ -199,7 +205,10 @@ class JobQueueDB extends JobQueue {
                        );
                        if ( $row ) { // claim the job
                                $dbw->update( 'job', // update by PK
-                                       array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+                                       array(
+                                               'job_token'           => $uuid,
+                                               'job_token_timestamp' => $dbw->timestamp(),
+                                               'job_attempts = job_attempts+1' ),
                                        array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
                                        __METHOD__
                                );
@@ -232,26 +241,30 @@ class JobQueueDB extends JobQueue {
                                // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
                                // Oracle and Postgre have no such limitation. However, MySQL offers an
                                // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
-                               $dbw->query( "UPDATE {$dbw->tableName( 'job' )}
-                                       SET
-                                               job_token = {$dbw->addQuotes( $uuid ) },
-                                               job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}
-                                       WHERE (
-                                               job_cmd = {$dbw->addQuotes( $this->type )}
-                                               AND job_token = {$dbw->addQuotes( '' )}
-                                       ) ORDER BY job_random ASC LIMIT 1",
+                               $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
+                                       "SET " .
+                                               "job_token = {$dbw->addQuotes( $uuid ) }, " .
+                                               "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
+                                               "job_attempts = job_attempts+1 " .
+                                       "WHERE ( " .
+                                               "job_cmd = {$dbw->addQuotes( $this->type )} " .
+                                               "AND job_token = {$dbw->addQuotes( '' )} " .
+                                       ") ORDER BY job_id ASC LIMIT 1",
                                        __METHOD__
                                );
                        } else {
                                // Use a subquery to find the job, within an UPDATE to claim it.
                                // This uses as much of the DB wrapper functions as possible.
                                $dbw->update( 'job',
-                                       array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+                                       array(
+                                               'job_token'           => $uuid,
+                                               'job_token_timestamp' => $dbw->timestamp(),
+                                               'job_attempts = job_attempts+1' ),
                                        array( 'job_id = (' .
                                                $dbw->selectSQLText( 'job', 'job_id',
                                                        array( 'job_cmd' => $this->type, 'job_token' => '' ),
                                                        __METHOD__,
-                                                       array( 'ORDER BY' => 'job_random ASC', 'LIMIT' => 1 ) ) .
+                                                       array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
                                                ')'
                                        ),
                                        __METHOD__
@@ -273,6 +286,47 @@ class JobQueueDB extends JobQueue {
                return $row;
        }
 
+       /**
+        * Recycle or destroy any jobs that have been claimed for too long
+        *
+        * @return integer Number of jobs recycled/deleted
+        */
+       protected function recycleStaleJobs() {
+               $now    = time();
+               $dbw    = $this->getMasterDB();
+
+               if ( $this->claimTTL > 0 ) { // re-try stale jobs...
+                       $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
+                       // Reset job_token for these jobs so that other runners will pick them up.
+                       // Set the timestamp to the current time, as it is useful to now that the job
+                       // was already tried before.
+                       $dbw->update( 'job',
+                               array(
+                                       'job_token' => '',
+                                       'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
+                               array(
+                                       'job_cmd' => $this->type,
+                                       "job_token != {$dbw->addQuotes( '' )}", // was acquired
+                                       "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
+                                       "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ),
+                               __METHOD__
+                       );
+               }
+
+               // Just destroy stale jobs...
+               $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
+               $conds = array(
+                       'job_cmd' => $this->type,
+                       "job_token != {$dbw->addQuotes( '' )}", // was acquired
+                       "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
+               );
+               if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
+                       $conds[] = "job_attempts >= {$dbw->addQuotes( self::MAX_ATTEMPTS )}";
+               }
+
+               return $dbw->affectedRows();
+       }
+
        /**
         * @see JobQueue::doAck()
         * @return Job|bool
@@ -330,17 +384,12 @@ class JobQueueDB extends JobQueue {
                        'job_params'    => self::makeBlob( $job->getParams() ),
                );
                // Additional job metadata
-               if ( $this->order === 'timestamp' ) { // oldest first
-                       $random = time() - 1325376000; // seconds since "January 1, 2012"
-               } else { // random first
-                       $random = mt_rand( 0, self::MAX_JOB_RANDOM );
-               }
                $dbw = $this->getMasterDB();
                $metaFields = array(
                        'job_id'        => $dbw->nextSequenceValue( 'job_job_id_seq' ),
                        'job_timestamp' => $dbw->timestamp(),
                        'job_sha1'      => wfBaseConvert( sha1( serialize( $descFields ) ), 16, 36, 32 ),
-                       'job_random'    => $random
+                       'job_random'    => mt_rand( 0, self::MAX_JOB_RANDOM )
                );
                return ( $descFields + $metaFields );
        }
diff --git a/maintenance/archives/patch-job_attempts.sql b/maintenance/archives/patch-job_attempts.sql
new file mode 100644 (file)
index 0000000..47b73e8
--- /dev/null
@@ -0,0 +1,4 @@
+ALTER TABLE /*_*/job
+    ADD COLUMN job_attempts integer unsigned NOT NULL default 0;
+
+CREATE INDEX /*i*/job_cmd_token_id ON /*_*/job (job_cmd,job_token,job_id);
index bdcd66e..aae5042 100644 (file)
@@ -1291,10 +1291,12 @@ CREATE TABLE /*_*/job (
   -- Stored as a PHP serialized array, or an empty string if there are no parameters
   job_params blob NOT NULL,
 
-  -- Random, non-unique, number used for job acquisition
-  -- Either a simple timestamp or a totally random number (for lock concurrency)
+  -- Random, non-unique, number used for job acquisition (for lock concurrency)
   job_random integer unsigned NOT NULL default 0,
 
+  -- The number of times this job has been locked
+  job_attempts integer unsigned NOT NULL default 0,
+
   -- Field that conveys process locks on rows via process UUIDs
   job_token varbinary(32) NOT NULL default '',
 
@@ -1307,6 +1309,7 @@ CREATE TABLE /*_*/job (
 
 CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1);
 CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random);
+CREATE INDEX /*i*/job_cmd_token_id ON /*_*/job (job_cmd,job_token,job_id);
 CREATE INDEX /*i*/job_cmd ON /*_*/job (job_cmd, job_namespace, job_title, job_params(128));
 CREATE INDEX /*i*/job_timestamp ON /*_*/job (job_timestamp);