* @since 1.21
*/
class JobQueueDB extends JobQueue {
+ const ROOTJOB_TTL = 1209600; // integer; seconds to remember root jobs (14 days)
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
- const MAX_ATTEMPTS = 3; // integer; number of times to try a job
const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
const MAX_OFFSET = 255; // integer; maximum number of rows to skip
protected function doGetAcquiredCount() {
global $wgMemc;
+ if ( $this->claimTTL <= 0 ) {
+ return 0; // no acknowledgements
+ }
+
$key = $this->getCacheKey( 'acquiredcount' );
$count = $wgMemc->get( $key );
list( $dbr, $scope ) = $this->getSlaveDB();
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
- array( 'job_cmd' => $this->type, "job_token !={$dbr->addQuotes('')}" ),
+ array( 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ),
__METHOD__
);
$wgMemc->set( $key, $count, self::CACHE_TTL_SHORT );
$uuid = wfRandomString( 32 ); // pop attempt
$job = false; // job popped off
- // Occasionally recycle jobs back into the queue that have been claimed too long
- if ( mt_rand( 0, 99 ) == 0 ) {
- $this->recycleStaleJobs();
- }
do { // retry when our row is invalid or deleted as a duplicate
// Try to reserve a row in the DB...
if ( in_array( $this->order, array( 'fifo', 'timestamp' ) ) ) {
*
* @return integer Number of jobs recycled/deleted
*/
- protected function recycleStaleJobs() {
- $now = time();
+ public function recycleAndDeleteStaleJobs() {
+ global $wgMemc;
+
+ $now = time();
list( $dbw, $scope ) = $this->getMasterDB();
$count = 0; // affected rows
);
$count += $dbw->affectedRows();
wfIncrStats( 'job-recycle', $dbw->affectedRows() );
+ $wgMemc->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
}
}
}
// Update the timestamp of the last root job started at the location...
- return $wgMemc->set( $key, $params['rootJobTimestamp'], 14*86400 ); // 2 weeks
+ return $wgMemc->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
} );
return true;
wfWaitForSlaves();
}
+ /**
+ * @return Array
+ */
+ protected function doGetPeriodicTasks() {
+ return array(
+ 'recycleAndDeleteStaleJobs' => array(
+ 'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
+ 'period' => ceil( $this->claimTTL / 2 )
+ )
+ );
+ }
+
+ /**
+ * @return void
+ */
+ protected function doFlushCaches() {
+ global $wgMemc;
+
+ foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
+ $wgMemc->delete( $this->getCacheKey( $type ) );
+ }
+ }
+
/**
* @return Array (DatabaseBase, ScopedCallback)
*/