return false;
}
- $found = $this->getSlaveDB()->selectField(
- 'job', '1', array( 'job_cmd' => $this->type ), __METHOD__
+ $found = $this->getSlaveDB()->selectField( // unclaimed job
+ 'job', '1', array( 'job_cmd' => $this->type, 'job_token' => '' ), __METHOD__
);
-
$wgMemc->add( $key, $found ? 'false' : 'true', self::CACHE_TTL );
+
+ return !$found;
}
/**
* @see JobQueue::doBatchPush()
+ * @param array $jobs
+ * @param $flags
+ * @throws DBError|Exception
* @return bool
*/
protected function doBatchPush( array $jobs, $flags ) {
foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
$dbw->insert( 'job', $rowBatch, __METHOD__ );
}
+ wfIncrStats( 'job-insert', count( $rows ) );
} catch ( DBError $e ) {
if ( $atomic ) {
$dbw->rollback( __METHOD__ );
$wgMemc->set( $this->getEmptinessCacheKey(), 'true', self::CACHE_TTL );
break; // nothing to do
}
+ wfIncrStats( 'job-pop' );
// Get the job object from the row...
$title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
if ( !$title ) {
$dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- wfIncrStats( 'job-pop' );
wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
continue; // try again
}
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
+ $job->id = $row->job_id; // XXX: work around broken subclasses
// Flag this job as an old duplicate based on its "root" job...
if ( $this->isRootJobOldDuplicate( $job ) ) {
$job = DuplicateJob::newFromJob( $job ); // convert to a no-op
*/
protected function claimRandom( $uuid, $rand, $gte ) {
$dbw = $this->getMasterDB();
- $dir = $gte ? 'ASC' : 'DESC';
$ineq = $gte ? '>=' : '<=';
$row = false; // the row acquired
'job_cmd' => $this->type,
'job_token' => '',
"job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
- __METHOD__,
- array( 'ORDER BY' => "job_random {$dir}" )
+ __METHOD__
+ // Bug 42614: "ORDER BY job_random" causes slowness on mysql for some reason
);
if ( $row ) { // claim the job
$dbw->update( 'job', // update by PK
$dbw = $this->getMasterDB();
$count = 0; // affected rows
- if ( $this->claimTTL > 0 ) { // re-try stale jobs...
+ if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
+ return $count; // already in progress
+ }
+
+ // Remove claims on jobs acquired for too long if enabled...
+ if ( $this->claimTTL > 0 ) {
$claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
- // Reset job_token for these jobs so that other runners will pick them up.
- // Set the timestamp to the current time, as it is useful to now that the job
- // was already tried before.
- $dbw->update( 'job',
- array(
- 'job_token' => '',
- 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
+ // Get the IDs of jobs that have be claimed but not finished after too long.
+ // These jobs can be recycled into the queue by expiring the claim. Selecting
+ // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
+ $res = $dbw->select( 'job', 'job_id',
array(
'job_cmd' => $this->type,
"job_token != {$dbw->addQuotes( '' )}", // was acquired
"job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
- "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ),
+ "job_attempts < {$dbw->addQuotes( self::MAX_ATTEMPTS )}" ), // retries left
__METHOD__
);
- $count += $dbw->affectedRows();
+ $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) );
+ if ( count( $ids ) ) {
+ // Reset job_token for these jobs so that other runners will pick them up.
+ // Set the timestamp to the current time, as it is useful to now that the job
+ // was already tried before (the timestamp becomes the "released" time).
+ $dbw->update( 'job',
+ array(
+ 'job_token' => '',
+ 'job_token_timestamp' => $dbw->timestamp( $now ) ), // time of release
+ array(
+ 'job_id' => $ids ),
+ __METHOD__
+ );
+ $count += $dbw->affectedRows();
+ }
}
- // Just destroy stale jobs...
+ // Just destroy any stale jobs...
$pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
$conds = array(
'job_cmd' => $this->type,
if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
$conds[] = "job_attempts >= {$dbw->addQuotes( self::MAX_ATTEMPTS )}";
}
- $dbw->delete( 'job', $conds, __METHOD__ );
- $count += $dbw->affectedRows();
+ // Get the IDs of jobs that are considered stale and should be removed. Selecting
+ // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
+ $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
+ $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) );
+ if ( count( $ids ) ) {
+ $dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
+ $count += $dbw->affectedRows();
+ }
+
+ $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
return $count;
}
/**
* @see JobQueue::doAck()
+ * @param Job $job
+ * @throws MWException
* @return Job|bool
*/
protected function doAck( Job $job ) {
+ if ( !$job->getId() ) {
+ throw new MWException( "Job of type '{$job->getType()}' has no ID." );
+ }
+
$dbw = $this->getMasterDB();
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
// Delete a row with a single DELETE without holding row locks over RTTs...
- $dbw->delete( 'job', array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ) );
+ $dbw->delete( 'job',
+ array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ), __METHOD__ );
return true;
}
/**
* @see JobQueue::doDeduplicateRootJob()
+ * @param Job $job
+ * @throws MWException
* @return bool
*/
protected function doDeduplicateRootJob( Job $job ) {
$key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
// Callers should call batchInsert() and then this function so that if the insert
// fails, the de-duplication registration will be aborted. Since the insert is
- // deferred till "transaction idle", do that same here, so that the ordering is
+ // deferred till "transaction idle", do the same here, so that the ordering is
// maintained. Having only the de-duplication registration succeed would cause
// jobs to become no-ops without any actual jobs that made them redundant.
$this->getMasterDB()->onTransactionIdle( function() use ( $params, $key ) {