* @since 1.21
*/
class JobQueueDB extends JobQueue {
- const CACHE_TTL = 300; // integer; seconds
- const MAX_JOB_RANDOM = 2147483647; // 2^31 - 1; used for job_random
+ const CACHE_TTL = 300; // integer; seconds to cache queue information
+ const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
+ const MAX_ATTEMPTS = 3; // integer; number of times to try a job
+ const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
/**
* @see JobQueue::doIsEmpty()
$autoTrx = $dbw->getFlag( DBO_TRX ); // automatic begin() enabled?
$dbw->clearFlag( DBO_TRX ); // make each query its own transaction
try {
+ // Occasionally recycle jobs back into the queue that have been claimed too long
+ if ( mt_rand( 0, 99 ) == 0 ) {
+ $this->recycleStaleJobs();
+ }
do { // retry when our row is invalid or deleted as a duplicate
// Try to reserve a row in the DB...
- if ( $this->order === 'timestamp' ) { // oldest first
+ if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
$row = $this->claimOldest( $uuid );
} else { // random first
$rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
);
if ( $row ) { // claim the job
$dbw->update( 'job', // update by PK
- array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+ array(
+ 'job_token' => $uuid,
+ 'job_token_timestamp' => $dbw->timestamp(),
+ 'job_attempts = job_attempts+1' ),
array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
__METHOD__
);
// same table being changed in an UPDATE query in MySQL (gives Error: 1093).
// Oracle and Postgre have no such limitation. However, MySQL offers an
// alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
- $dbw->query( "UPDATE {$dbw->tableName( 'job' )}
- SET
- job_token = {$dbw->addQuotes( $uuid ) },
- job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}
- WHERE (
- job_cmd = {$dbw->addQuotes( $this->type )}
- AND job_token = {$dbw->addQuotes( '' )}
- ) ORDER BY job_random ASC LIMIT 1",
+ $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
+ "SET " .
+ "job_token = {$dbw->addQuotes( $uuid ) }, " .
+ "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
+ "job_attempts = job_attempts+1 " .
+ "WHERE ( " .
+ "job_cmd = {$dbw->addQuotes( $this->type )} " .
+ "AND job_token = {$dbw->addQuotes( '' )} " .
+ ") ORDER BY job_id ASC LIMIT 1",
__METHOD__
);
} else {
// Use a subquery to find the job, within an UPDATE to claim it.
// This uses as much of the DB wrapper functions as possible.
$dbw->update( 'job',
- array( 'job_token' => $uuid, 'job_token_timestamp' => $dbw->timestamp() ),
+ array(
+ 'job_token' => $uuid,
+ 'job_token_timestamp' => $dbw->timestamp(),
+ 'job_attempts = job_attempts+1' ),
array( 'job_id = (' .
$dbw->selectSQLText( 'job', 'job_id',
array( 'job_cmd' => $this->type, 'job_token' => '' ),
__METHOD__,
- array( 'ORDER BY' => 'job_random ASC', 'LIMIT' => 1 ) ) .
+ array( 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ) ) .
')'
),
__METHOD__
return $row;
}
+ /**
+ * Recycle or destroy any jobs that have been claimed for too long
+ *
+ * @return integer Number of jobs recycled/deleted
+ */
+ protected function recycleStaleJobs() {
+ $now = time();
+ $dbw = $this->getMasterDB();
+ $count = 0; // affected rows
+
+ if ( $this->claimTTL > 0 ) { // re-try stale jobs...
+ $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
+ // Reset job_token for these jobs so that other runners will pick them up.
+ // Set the timestamp to the current time, as it is useful to now that the job
+ // was already tried before.
+ $dbw->update( 'job',
+ array(
+ 'job_token' => '',
+ 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
+ array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was acquired
+ "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
+ "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ),
+ __METHOD__
+ );
+ $count += $dbw->affectedRows();
+ }
+
+ // Just destroy stale jobs...
+ $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
+ $conds = array(
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was acquired
+ "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
+ );
+ if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
+ $conds[] = "job_attempts >= {$dbw->addQuotes( self::MAX_ATTEMPTS )}";
+ }
+ $dbw->delete( 'job', $conds, __METHOD__ );
+ $count += $dbw->affectedRows();
+
+ return $count;
+ }
+
/**
* @see JobQueue::doAck()
* @return Job|bool
'job_params' => self::makeBlob( $job->getParams() ),
);
// Additional job metadata
- if ( $this->order === 'timestamp' ) { // oldest first
- $random = time() - 1325376000; // seconds since "January 1, 2012"
- } else { // random first
- $random = mt_rand( 0, self::MAX_JOB_RANDOM );
- }
$dbw = $this->getMasterDB();
$metaFields = array(
'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
'job_timestamp' => $dbw->timestamp(),
'job_sha1' => wfBaseConvert( sha1( serialize( $descFields ) ), 16, 36, 32 ),
- 'job_random' => $random
+ 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
);
return ( $descFields + $metaFields );
}