From: Aaron Schulz Date: Wed, 24 Oct 2012 00:29:54 +0000 (-0700) Subject: [JobQueue] Added support for retrying jobs when runners die. X-Git-Tag: 1.31.0-rc.0~21645 X-Git-Url: https://git.heureux-cyclage.org/?p=lhc%2Fweb%2Fwiklou.git;a=commitdiff_plain;h=cd000590ee694af73edb05b5e37b60b5e2b77fd5 [JobQueue] Added support for retrying jobs when runners die. * This adds a new 'claimTTL' setting which recycles jobs into the queue if they have been claimed for longer than that duration (in seconds). * This also purges jobs for such cases when job retries are not enabled. * This adds a new job_attempts column and adds an index to improve the query and semantics for FIFO ordered queues. Change-Id: Idb6217a694d86a4d6fc881768deed424628f015d --- diff --git a/includes/installer/MysqlUpdater.php b/includes/installer/MysqlUpdater.php index 3f1dad9f2e..7cf382bcee 100644 --- a/includes/installer/MysqlUpdater.php +++ b/includes/installer/MysqlUpdater.php @@ -225,6 +225,7 @@ class MysqlUpdater extends DatabaseUpdater { array( 'addTable', 'sites', 'patch-sites.sql' ), array( 'addField', 'filearchive', 'fa_sha1', 'patch-fa_sha1.sql' ), array( 'addField', 'job', 'job_token', 'patch-job_token.sql' ), + array( 'addField', 'job', 'job_attempts', 'patch-job_attempts.sql' ), ); } diff --git a/includes/installer/SqliteUpdater.php b/includes/installer/SqliteUpdater.php index 2fa3f31c76..3921defafb 100644 --- a/includes/installer/SqliteUpdater.php +++ b/includes/installer/SqliteUpdater.php @@ -105,6 +105,7 @@ class SqliteUpdater extends DatabaseUpdater { array( 'addTable', 'sites', 'patch-sites.sql' ), array( 'addField', 'filearchive', 'fa_sha1', 'patch-fa_sha1.sql' ), array( 'addField', 'job', 'job_token', 'patch-job_token.sql' ), + array( 'addField', 'job', 'job_attempts', 'patch-job_attempts.sql' ), ); } diff --git a/includes/job/JobQueue.php b/includes/job/JobQueue.php index 21ef6d34e1..aa3d6e2a4b 100644 --- a/includes/job/JobQueue.php +++ b/includes/job/JobQueue.php @@ -32,6 +32,7 @@ abstract class JobQueue { protected $wiki; // string; wiki ID protected $type; // string; job type protected $order; // string; job priority for pop() + protected $claimTTL; // integer; seconds const QoS_Atomic = 1; // integer; "all-or-nothing" job insertions @@ -39,23 +40,28 @@ abstract class JobQueue { * @param $params array */ protected function __construct( array $params ) { - $this->wiki = $params['wiki']; - $this->type = $params['type']; - $this->order = isset( $params['order'] ) ? $params['order'] : 'random'; + $this->wiki = $params['wiki']; + $this->type = $params['type']; + $this->order = isset( $params['order'] ) ? $params['order'] : 'random'; + $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; } /** * Get a job queue object of the specified type. * $params includes: - * class : What job class to use (determines job type) - * wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) - * type : The name of the job types this queue handles - * order : Order that pop() selects jobs, either "timestamp" or "random". - * If "timestamp" is used, the queue will effectively be FIFO. Note that - * pop() will not be exactly FIFO, and even if it was, job completion would - * not appear to be exactly FIFO since jobs can take different times to finish. - * If "random" is used, pop() will pick jobs in random order. This might be - * useful for improving concurrency depending on the queue storage medium. + * class : What job class to use (determines job type) + * wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * type : The name of the job types this queue handles + * order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". + * If "fifo" is used, the queue will effectively be FIFO. Note that + * job completion will not appear to be exactly FIFO if there are multiple + * job runners since jobs can take different times to finish once popped. + * If "timestamp" is used, the queue will at least be loosely ordered + * by timestamp, allowing for some jobs to be popped off out of order. + * If "random" is used, pop() will pick jobs in random order. This might be + * useful for improving concurrency depending on the queue storage medium. + * claimTTL : If supported, the queue will recycle jobs that have been popped + * but not acknowledged as completed after this many seconds. * * @param $params array * @return JobQueue diff --git a/includes/job/JobQueueDB.php b/includes/job/JobQueueDB.php index 223ef41fe8..0ce42961c1 100644 --- a/includes/job/JobQueueDB.php +++ b/includes/job/JobQueueDB.php @@ -28,8 +28,10 @@ * @since 1.21 */ class JobQueueDB extends JobQueue { - const CACHE_TTL = 300; // integer; seconds - const MAX_JOB_RANDOM = 2147483647; // 2^31 - 1; used for job_random + const CACHE_TTL = 300; // integer; seconds to cache queue information + const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed + const MAX_ATTEMPTS = 3; // integer; number of times to try a job + const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random /** * @see JobQueue::doIsEmpty() @@ -123,9 +125,13 @@ class JobQueueDB extends JobQueue { $autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled? $dbw->clearFlag( DBO_TRX ); // make each query its own transaction try { + // Occasionally recycle jobs back into the queue that have been claimed too long + if ( mt_rand( 0, 99 ) == 0 ) { + $this->recycleStaleJobs(); + } do { // retry when our row is invalid or deleted as a duplicate // Try to reserve a row in the DB... - if ( $this->order === 'timestamp' ) { // oldest first + if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) { $row = $this->claimOldest( $uuid ); } else { // random first $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs @@ -199,7 +205,10 @@ class JobQueueDB extends JobQueue { ); if ( $row ) { // claim the job $dbw->update( 'job', // update by PK - array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ), + array( + 'job_token' => $uuid, + 'job_token_timestamp' => $dbw->timestamp(), + 'job_attempts = job_attempts+1' ), array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ), __METHOD__ ); @@ -232,26 +241,30 @@ class JobQueueDB extends JobQueue { // same table being changed in an UPDATE query in MySQL (gives Error: 1093). // Oracle and Postgre have no such limitation. However, MySQL offers an // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. - $dbw->query( "UPDATE {$dbw->tableName( 'job' )} - SET - job_token = {$dbw->addQuotes( $uuid ) }, - job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )} - WHERE ( - job_cmd = {$dbw->addQuotes( $this->type )} - AND job_token = {$dbw->addQuotes( '' )} - ) ORDER BY job_random ASC LIMIT 1", + $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . + "SET " . + "job_token = {$dbw->addQuotes( $uuid ) }, " . + "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . + "job_attempts = job_attempts+1 " . + "WHERE ( " . + "job_cmd = {$dbw->addQuotes( $this->type )} " . + "AND job_token = {$dbw->addQuotes( '' )} " . + ") ORDER BY job_id ASC LIMIT 1", __METHOD__ ); } else { // Use a subquery to find the job, within an UPDATE to claim it. // This uses as much of the DB wrapper functions as possible. $dbw->update( 'job', - array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ), + array( + 'job_token' => $uuid, + 'job_token_timestamp' => $dbw->timestamp(), + 'job_attempts = job_attempts+1' ), array( 'job_id = (' . $dbw->selectSQLText( 'job', 'job_id', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__, - array( 'ORDER BY' => 'job_random ASC', 'LIMIT' => 1 ) ) . + array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) . ')' ), __METHOD__ @@ -273,6 +286,47 @@ class JobQueueDB extends JobQueue { return $row; } + /** + * Recycle or destroy any jobs that have been claimed for too long + * + * @return integer Number of jobs recycled/deleted + */ + protected function recycleStaleJobs() { + $now = time(); + $dbw = $this->getMasterDB(); + + if ( $this->claimTTL > 0 ) { // re-try stale jobs... + $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); + // Reset job_token for these jobs so that other runners will pick them up. + // Set the timestamp to the current time, as it is useful to now that the job + // was already tried before. + $dbw->update( 'job', + array( + 'job_token' => '', + 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release + array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale + "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ), + __METHOD__ + ); + } + + // Just destroy stale jobs... + $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); + $conds = array( + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale + ); + if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... + $conds[] = "job_attempts >= {$dbw->addQuotes( self::MAX_ATTEMPTS )}"; + } + + return $dbw->affectedRows(); + } + /** * @see JobQueue::doAck() * @return Job|bool @@ -330,17 +384,12 @@ class JobQueueDB extends JobQueue { 'job_params' => self::makeBlob( $job->getParams() ), ); // Additional job metadata - if ( $this->order === 'timestamp' ) { // oldest first - $random = time() - 1325376000; // seconds since "January 1, 2012" - } else { // random first - $random = mt_rand( 0, self::MAX_JOB_RANDOM ); - } $dbw = $this->getMasterDB(); $metaFields = array( 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ), 'job_timestamp' => $dbw->timestamp(), 'job_sha1' => wfBaseConvert( sha1( serialize( $descFields ) ), 16, 36, 32 ), - 'job_random' => $random + 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) ); return ( $descFields + $metaFields ); } diff --git a/maintenance/archives/patch-job_attempts.sql b/maintenance/archives/patch-job_attempts.sql new file mode 100644 index 0000000000..47b73e81bd --- /dev/null +++ b/maintenance/archives/patch-job_attempts.sql @@ -0,0 +1,4 @@ +ALTER TABLE /*_*/job + ADD COLUMN job_attempts integer unsigned NOT NULL default 0; + +CREATE INDEX /*i*/job_cmd_token_id ON /*_*/job (job_cmd,job_token,job_id); diff --git a/maintenance/tables.sql b/maintenance/tables.sql index bdcd66eda5..aae5042418 100644 --- a/maintenance/tables.sql +++ b/maintenance/tables.sql @@ -1291,10 +1291,12 @@ CREATE TABLE /*_*/job ( -- Stored as a PHP serialized array, or an empty string if there are no parameters job_params blob NOT NULL, - -- Random, non-unique, number used for job acquisition - -- Either a simple timestamp or a totally random number (for lock concurrency) + -- Random, non-unique, number used for job acquisition (for lock concurrency) job_random integer unsigned NOT NULL default 0, + -- The number of times this job has been locked + job_attempts integer unsigned NOT NULL default 0, + -- Field that conveys process locks on rows via process UUIDs job_token varbinary(32) NOT NULL default '', @@ -1307,6 +1309,7 @@ CREATE TABLE /*_*/job ( CREATE INDEX /*i*/job_sha1 ON /*_*/job (job_sha1); CREATE INDEX /*i*/job_cmd_token ON /*_*/job (job_cmd,job_token,job_random); +CREATE INDEX /*i*/job_cmd_token_id ON /*_*/job (job_cmd,job_token,job_id); CREATE INDEX /*i*/job_cmd ON /*_*/job (job_cmd, job_namespace, job_title, job_params(128)); CREATE INDEX /*i*/job_timestamp ON /*_*/job (job_timestamp);