* @return bool
*/
protected function doBatchPush( array $jobs, $flags ) {
- if ( count( $jobs ) ) {
- list( $dbw, $scope ) = $this->getMasterDB();
-
- $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
- $rowList = array(); // list of jobs for jobs that are are not de-duplicated
-
- foreach ( $jobs as $job ) {
- $row = $this->insertFields( $job );
- if ( $job->ignoreDuplicates() ) {
- $rowSet[$row['job_sha1']] = $row;
- } else {
- $rowList[] = $row;
- }
+ list( $dbw, $scope ) = $this->getMasterDB();
+
+ $that = $this;
+ $method = __METHOD__;
+ $dbw->onTransactionIdle(
+ function() use ( $dbw, $that, $jobs, $flags, $method, $scope ) {
+ $that->doBatchPushInternal( $dbw, $jobs, $flags, $method );
}
+ );
- $key = $this->getCacheKey( 'empty' );
- $atomic = ( $flags & self::QOS_ATOMIC );
- $cache = $this->cache;
- $method = __METHOD__;
+ return true;
+ }
- $dbw->onTransactionIdle(
- function() use ( $dbw, $cache, $rowSet, $rowList, $atomic, $key, $method, $scope
- ) {
- if ( $atomic ) {
- $dbw->begin( $method ); // wrap all the job additions in one transaction
- }
- try {
- // Strip out any duplicate jobs that are already in the queue...
- if ( count( $rowSet ) ) {
- $res = $dbw->select( 'job', 'job_sha1',
- array(
- // No job_type condition since it's part of the job_sha1 hash
- 'job_sha1' => array_keys( $rowSet ),
- 'job_token' => '' // unclaimed
- ),
- $method
- );
- foreach ( $res as $row ) {
- wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate." );
- unset( $rowSet[$row->job_sha1] ); // already enqueued
- }
- }
- // Build the full list of job rows to insert
- $rows = array_merge( $rowList, array_values( $rowSet ) );
- // Insert the job rows in chunks to avoid slave lag...
- foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
- $dbw->insert( 'job', $rowBatch, $method );
- }
- wfIncrStats( 'job-insert', count( $rows ) );
- wfIncrStats( 'job-insert-duplicate',
- count( $rowSet ) + count( $rowList ) - count( $rows ) );
- } catch ( DBError $e ) {
- if ( $atomic ) {
- $dbw->rollback( $method );
- }
- throw $e;
- }
- if ( $atomic ) {
- $dbw->commit( $method );
- }
+ /**
+ * This function should *not* be called outside of JobQueueDB
+ *
+ * @param DatabaseBase $dbw
+ * @param array $jobs
+ * @param int $flags
+ * @param string $method
+ * @return boolean
+ * @throws type
+ */
+ public function doBatchPushInternal( DatabaseBase $dbw, array $jobs, $flags, $method ) {
+ if ( !count( $jobs ) ) {
+ return true;
+ }
- $cache->set( $key, 'false', JobQueueDB::CACHE_TTL_LONG );
- } );
+ $rowSet = array(); // (sha1 => job) map for jobs that are de-duplicated
+ $rowList = array(); // list of jobs for jobs that are are not de-duplicated
+ foreach ( $jobs as $job ) {
+ $row = $this->insertFields( $job );
+ if ( $job->ignoreDuplicates() ) {
+ $rowSet[$row['job_sha1']] = $row;
+ } else {
+ $rowList[] = $row;
+ }
}
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->begin( $method ); // wrap all the job additions in one transaction
+ }
+ try {
+ // Strip out any duplicate jobs that are already in the queue...
+ if ( count( $rowSet ) ) {
+ $res = $dbw->select( 'job', 'job_sha1',
+ array(
+ // No job_type condition since it's part of the job_sha1 hash
+ 'job_sha1' => array_keys( $rowSet ),
+ 'job_token' => '' // unclaimed
+ ),
+ $method
+ );
+ foreach ( $res as $row ) {
+ wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
+ unset( $rowSet[$row->job_sha1] ); // already enqueued
+ }
+ }
+ // Build the full list of job rows to insert
+ $rows = array_merge( $rowList, array_values( $rowSet ) );
+ // Insert the job rows in chunks to avoid slave lag...
+ foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
+ $dbw->insert( 'job', $rowBatch, $method );
+ }
+ JobQueue::incrStats( 'job-insert', $this->type, count( $rows ) );
+ JobQueue::incrStats( 'job-insert-duplicate', $this->type,
+ count( $rowSet ) + count( $rowList ) - count( $rows ) );
+ } catch ( DBError $e ) {
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->rollback( $method );
+ }
+ throw $e;
+ }
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->commit( $method );
+ }
+
+ $this->cache->set( $this->getCacheKey( 'empty' ), 'false', JobQueueDB::CACHE_TTL_LONG );
+
return true;
}
list( $dbw, $scope ) = $this->getMasterDB();
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
+ } );
$uuid = wfRandomString( 32 ); // pop attempt
$job = false; // job popped off
$this->cache->set( $this->getCacheKey( 'empty' ), 'true', self::CACHE_TTL_LONG );
break; // nothing to do
}
- wfIncrStats( 'job-pop' );
+ JobQueue::incrStats( 'job-pop', $this->type );
// Get the job object from the row...
$title = Title::makeTitleSafe( $row->job_namespace, $row->job_title );
if ( !$title ) {
$dbw->delete( 'job', array( 'job_id' => $row->job_id ), __METHOD__ );
- wfDebugLog( 'JobQueueDB', "Row has invalid title '{$row->job_title}'." );
+ wfDebug( "Row has invalid title '{$row->job_title}'." );
continue; // try again
}
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ), $row->job_id );
+ $job->metadata['id'] = $row->job_id;
$job->id = $row->job_id; // XXX: work around broken subclasses
break; // done
- } while( true );
+ } while ( true );
return $job;
}
$dir = $gte ? 'ASC' : 'DESC';
$row = $dbw->selectRow( 'job', '*', // find a random job
array(
- 'job_cmd' => $this->type,
+ 'job_cmd' => $this->type,
'job_token' => '', // unclaimed
"job_random {$ineq} {$dbw->addQuotes( $rand )}" ),
__METHOD__,
// instead of job_random for reducing excess claim retries.
$row = $dbw->selectRow( 'job', '*', // find a random job
array(
- 'job_cmd' => $this->type,
+ 'job_cmd' => $this->type,
'job_token' => '', // unclaimed
),
__METHOD__,
if ( $row ) { // claim the job
$dbw->update( 'job', // update by PK
array(
- 'job_token' => $uuid,
+ 'job_token' => $uuid,
'job_token_timestamp' => $dbw->timestamp(),
'job_attempts = job_attempts+1' ),
array( 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ),
// This uses as much of the DB wrapper functions as possible.
$dbw->update( 'job',
array(
- 'job_token' => $uuid,
+ 'job_token' => $uuid,
'job_token_timestamp' => $dbw->timestamp(),
'job_attempts = job_attempts+1' ),
array( 'job_id = (' .
array( 'job_cmd' => $this->type, 'job_token' => $uuid ), __METHOD__
);
if ( !$row ) { // raced out by duplicate job removal
- wfDebugLog( 'JobQueueDB', "Row deleted as duplicate by another process." );
+ wfDebug( "Row deleted as duplicate by another process." );
}
} else {
break; // nothing to do
* @return Job|bool
*/
protected function doAck( Job $job ) {
- if ( !$job->getId() ) {
+ if ( !isset( $job->metadata['id'] ) ) {
throw new MWException( "Job of type '{$job->getType()}' has no ID." );
}
list( $dbw, $scope ) = $this->getMasterDB();
$dbw->commit( __METHOD__, 'flush' ); // flush existing transaction
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ $scopedReset = new ScopedCallback( function() use ( $dbw, $autoTrx ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
+ } );
// Delete a row with a single DELETE without holding row locks over RTTs...
$dbw->delete( 'job',
- array( 'job_cmd' => $this->type, 'job_id' => $job->getId() ), __METHOD__ );
+ array( 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ), __METHOD__ );
return true;
}
return true;
}
+ /**
+ * @see JobQueue::doDelete()
+ * @return bool
+ */
+ protected function doDelete() {
+ list( $dbw, $scope ) = $this->getMasterDB();
+
+ $dbw->delete( 'job', array( 'job_cmd' => $this->type ) );
+ return true;
+ }
+
/**
* @see JobQueue::doWaitForBackups()
* @return void
return array(
'recycleAndDeleteStaleJobs' => array(
'callback' => array( $this, 'recycleAndDeleteStaleJobs' ),
- 'period' => ceil( $this->claimTTL / 2 )
+ 'period' => ceil( $this->claimTTL / 2 )
)
);
}
strlen( $row->job_params ) ? unserialize( $row->job_params ) : false,
$row->job_id
);
+ $job->metadata['id'] = $row->job_id;
$job->id = $row->job_id; // XXX: work around broken subclasses
return $job;
}
"job_attempts < {$dbw->addQuotes( $this->maxTries )}" ), // retries left
__METHOD__
);
- $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) );
+ $ids = array_map(
+ function( $o ) {
+ return $o->job_id;
+ }, iterator_to_array( $res )
+ );
if ( count( $ids ) ) {
// Reset job_token for these jobs so that other runners will pick them up.
// Set the timestamp to the current time, as it is useful to now that the job
__METHOD__
);
$count += $dbw->affectedRows();
- wfIncrStats( 'job-recycle', $dbw->affectedRows() );
+ JobQueue::incrStats( 'job-recycle', $this->type, $dbw->affectedRows() );
$this->cache->set( $this->getCacheKey( 'empty' ), 'false', self::CACHE_TTL_LONG );
}
}
// Get the IDs of jobs that are considered stale and should be removed. Selecting
// the IDs first means that the UPDATE can be done by primary key (less deadlocks).
$res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
- $ids = array_map( function( $o ) { return $o->job_id; }, iterator_to_array( $res ) );
+ $ids = array_map(
+ function( $o ) {
+ return $o->job_id;
+ }, iterator_to_array( $res )
+ );
if ( count( $ids ) ) {
$dbw->delete( 'job', array( 'job_id' => $ids ), __METHOD__ );
$count += $dbw->affectedRows();
+ JobQueue::incrStats( 'job-abandon', $this->type, $dbw->affectedRows() );
}
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
*/
private function getCacheKey( $property ) {
list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
- return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, $property );
+ $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
}
/**