*/
class JobQueueDB extends JobQueue {
const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
- const CACHE_TTL_LONG = 300; // integer; seconds to cache info that is kept up to date
const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
const MAX_OFFSET = 255; // integer; maximum number of rows to skip
* @return bool
*/
protected function doIsEmpty() {
- $key = $this->getCacheKey( 'empty' );
-
- $isEmpty = $this->cache->get( $key );
- if ( $isEmpty === 'true' ) {
- return true;
- } elseif ( $isEmpty === 'false' ) {
- return false;
- }
-
$dbr = $this->getSlaveDB();
try {
$found = $dbr->selectField( // unclaimed job
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
- $this->cache->add( $key, $found ? 'false' : 'true', self::CACHE_TTL_LONG );
return !$found;
}
foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
$dbw->insert( 'job', $rowBatch, $method );
}
- JobQueue::incrStats( 'job-insert', $this->type, count( $rows ), $this->wiki );
+ JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
JobQueue::incrStats(
'job-insert-duplicate',
$this->type,
- count( $rowSet ) + count( $rowList ) - count( $rows ),
- $this->wiki
+ count( $rowSet ) + count( $rowList ) - count( $rows )
);
} catch ( DBError $e ) {
if ( $flags & self::QOS_ATOMIC ) {
$dbw->commit( $method );
}
- $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
-
return;
}
* @return Job|bool
*/
protected function doPop() {
- if ( $this->cache->get( $this->getCacheKey( 'empty' ) ) === 'true' ) {
- return false; // queue is empty
- }
-
$dbw = $this->getMasterDB();
try {
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
}
// Check if we found a row to reserve...
if ( !$row ) {
- $this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
break; // nothing to do
}
- JobQueue::incrStats( 'job-pop', $this->type, 1, $this->wiki );
+ JobQueue::incrStats( 'job-pop', $this->type );
// Get the job object from the row...
- $title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
- if ( !$title ) {
- $dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- wfDebug( "Row has invalid title '{$row->job_title}'.\n" );
- continue; // try again
- }
+ $title = Title::makeTitle( $row->job_namespace, $row->job_title );
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
$job->metadata['id'] = $row->job_id;
// Delete a row with a single DELETE without holding row locks over RTTs...
$dbw->delete( 'job',
array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
+
+ JobQueue::incrStats( 'job-ack', $this->type );
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
* @return void
*/
protected function doFlushCaches() {
- foreach ( array( 'empty', 'size', 'acquiredcount' ) as $type ) {
+ foreach ( array( 'size', 'acquiredcount' ) as $type ) {
$this->cache->delete( $this->getCacheKey( $type ) );
}
}
);
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'job-recycle', $this->type, $affected, $this->wiki );
- $this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
+ JobQueue::incrStats( 'job-recycle', $this->type, $affected );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
}
}
$dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'job-abandon', $this->type, $affected, $this->wiki );
+ JobQueue::incrStats( 'job-abandon', $this->type, $affected );
}
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );