use Wikimedia\Rdbms\DBConnectionError;
use Wikimedia\Rdbms\DBError;
use MediaWiki\MediaWikiServices;
+use Wikimedia\Rdbms\IMaintainableDatabase;
use Wikimedia\ScopedCallback;
/**
const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
const MAX_OFFSET = 255; // integer; maximum number of rows to skip
- /** @var WANObjectCache */
- protected $cache;
- /** @var IDatabase|DBError|null */
+ /** @var IMaintainableDatabase|DBError|null */
protected $conn;
/** @var array|null Server configuration array */
* If not specified, the primary DB cluster for the wiki will be used.
* This can be overridden with a custom cluster so that DB handles will
* be retrieved via LBFactory::getExternalLB() and getConnection().
- * - wanCache : An instance of WANObjectCache to use for caching.
* @param array $params
*/
protected function __construct( array $params ) {
} elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
$this->cluster = $params['cluster'];
}
-
- $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
}
protected function supportedOrders() {
'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
return !$found;
protected function doGetSize() {
$key = $this->getCacheKey( 'size' );
- $size = $this->cache->get( $key );
+ $size = $this->wanCache->get( $key );
if ( is_int( $size ) ) {
return $size;
}
__METHOD__
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
- $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
+ $this->wanCache->set( $key, $size, self::CACHE_TTL_SHORT );
return $size;
}
$key = $this->getCacheKey( 'acquiredcount' );
- $count = $this->cache->get( $key );
+ $count = $this->wanCache->get( $key );
if ( is_int( $count ) ) {
return $count;
}
__METHOD__
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
- $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+ $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
return $count;
}
$key = $this->getCacheKey( 'abandonedcount' );
- $count = $this->cache->get( $key );
+ $count = $this->wanCache->get( $key );
if ( is_int( $count ) ) {
return $count;
}
__METHOD__
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
- $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+ $this->wanCache->set( $key, $count, self::CACHE_TTL_SHORT );
return $count;
}
count( $rowSet ) + count( $rowList ) - count( $rows )
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
if ( $flags & self::QOS_ATOMIC ) {
$dbw->endAtomic( $method );
/**
* @see JobQueue::doPop()
- * @return Job|bool
+ * @return RunnableJob|bool
*/
protected function doPop() {
$dbw = $this->getMasterDB();
break; // nothing to do
}
$this->incrStats( 'pops', $this->type );
+
// Get the job object from the row...
- $title = Title::makeTitle( $row->job_namespace, $row->job_title );
- $job = Job::factory( $row->job_cmd, $title,
- self::extractBlob( $row->job_params ) );
- $job->setMetadata( 'id', $row->job_id );
- $job->setMetadata( 'timestamp', $row->job_timestamp );
+ $job = $this->jobFromRow( $row );
break; // done
} while ( true );
$this->recycleAndDeleteStaleJobs();
}
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
return $job;
/** @noinspection PhpUnusedLocalVariableInspection */
$scope = $this->getScopedNoTrxFlag( $dbw );
// Check cache to see if the queue has <= OFFSET items
- $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
+ $tinyQueue = $this->wanCache->get( $this->getCacheKey( 'small' ) );
- $row = false; // the row acquired
$invertedDirection = false; // whether one job_random direction was already scanned
// This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
// instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
);
if ( !$row ) {
$tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
- $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
+ $this->wanCache->set( $this->getCacheKey( 'small' ), 1, 30 );
continue; // use job_random
}
}
/**
* @see JobQueue::doAck()
- * @param Job $job
+ * @param RunnableJob $job
* @throws MWException
*/
- protected function doAck( Job $job ) {
+ protected function doAck( RunnableJob $job ) {
$id = $job->getMetadata( 'id' );
if ( $id === null ) {
throw new MWException( "Job of type '{$job->getType()}' has no ID." );
$this->incrStats( 'acks', $this->type );
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
}
* @return bool
*/
protected function doDeduplicateRootJob( IJobSpecification $job ) {
- $params = $job->getParams();
- if ( !isset( $params['rootJobSignature'] ) ) {
- throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
- } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
- throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
- }
- $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
- // Callers should call JobQueueGroup::push() before this method so that if the insert
- // fails, the de-duplication registration will be aborted. Since the insert is
- // deferred till "transaction idle", do the same here, so that the ordering is
+ // Callers should call JobQueueGroup::push() before this method so that if the
+ // insert fails, the de-duplication registration will be aborted. Since the insert
+ // is deferred till "transaction idle", do the same here, so that the ordering is
// maintained. Having only the de-duplication registration succeed would cause
// jobs to become no-ops without any actual jobs that made them redundant.
$dbw = $this->getMasterDB();
/** @noinspection PhpUnusedLocalVariableInspection */
$scope = $this->getScopedNoTrxFlag( $dbw );
-
- $cache = $this->dupCache;
$dbw->onTransactionCommitOrIdle(
- function () use ( $cache, $params, $key ) {
- $timestamp = $cache->get( $key ); // current last timestamp of this job
- if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
- return true; // a newer version of this root job was enqueued
- }
-
- // Update the timestamp of the last root job started at the location...
- return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+ function () use ( $job ) {
+ parent::doDeduplicateRootJob( $job );
},
__METHOD__
);
try {
$dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
return true;
*/
protected function doFlushCaches() {
foreach ( [ 'size', 'acquiredcount' ] as $type ) {
- $this->cache->delete( $this->getCacheKey( $type ) );
+ $this->wanCache->delete( $this->getCacheKey( $type ) );
}
}
return new MappedIterator(
$dbr->select( 'job', self::selectFields(), $conds ),
function ( $row ) {
- $job = Job::factory(
- $row->job_cmd,
- Title::makeTitle( $row->job_namespace, $row->job_title ),
- strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
- );
- $job->setMetadata( 'id', $row->job_id );
- $job->setMetadata( 'timestamp', $row->job_timestamp );
-
- return $job;
+ return $this->jobFromRow( $row );
}
);
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
}
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
} catch ( DBError $e ) {
- $this->throwDBException( $e );
+ throw $this->getDBException( $e );
}
return $count;
return [
// Fields that describe the nature of the job
'job_cmd' => $job->getType(),
- 'job_namespace' => $job->getTitle()->getNamespace(),
- 'job_title' => $job->getTitle()->getDBkey(),
+ 'job_namespace' => $job->getParams()['namespace'] ?? NS_SPECIAL,
+ 'job_title' => $job->getParams()['title'] ?? '',
'job_params' => self::makeBlob( $job->getParams() ),
// Additional job metadata
'job_timestamp' => $db->timestamp(),
/**
* @throws JobQueueConnectionError
- * @return IDatabase
+ * @return IMaintainableDatabase
*/
protected function getMasterDB() {
try {
/**
* @param int $index (DB_REPLICA/DB_MASTER)
- * @return IDatabase
+ * @return IMaintainableDatabase
*/
protected function getDB( $index ) {
if ( $this->server ) {
? $lbFactory->getExternalLB( $this->cluster )
: $lbFactory->getMainLB( $this->domain );
- return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
+ if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) {
// Keep a separate connection to avoid contention and deadlocks;
// However, SQLite has the opposite behavior due to DB-level locking.
- ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
+ $flags = $lb::CONN_TRX_AUTOCOMMIT;
+ } else {
// Jobs insertion will be defered until the PRESEND stage to reduce contention.
- : $lb->getConnectionRef( $index, [], $this->domain );
+ $flags = 0;
+ }
+
+ return $lb->getMaintenanceConnectionRef( $index, [], $this->domain, $flags );
}
}
private function getCacheKey( $property ) {
$cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
- return $this->cache->makeGlobalKey(
+ return $this->wanCache->makeGlobalKey(
'jobqueue',
$this->domain,
$cluster,
}
/**
- * @param string $blob
- * @return bool|mixed
+ * @param stdClass $row
+ * @return RunnableJob|null
*/
- protected static function extractBlob( $blob ) {
- if ( (string)$blob !== '' ) {
- return unserialize( $blob );
- } else {
- return false;
+ protected function jobFromRow( $row ) {
+ $params = ( (string)$row->job_params !== '' ) ? unserialize( $row->job_params ) : [];
+ if ( !is_array( $params ) ) { // this shouldn't happen
+ throw new UnexpectedValueException(
+ "Could not unserialize job with ID '{$row->job_id}'." );
}
+
+ $params += [ 'namespace' => $row->job_namespace, 'title' => $row->job_title ];
+ $job = $this->factoryJob( $row->job_cmd, $params );
+ $job->setMetadata( 'id', $row->job_id );
+ $job->setMetadata( 'timestamp', $row->job_timestamp );
+
+ return $job;
}
/**
* @param DBError $e
- * @throws JobQueueError
+ * @return JobQueueError
*/
- protected function throwDBException( DBError $e ) {
- throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
+ protected function getDBException( DBError $e ) {
+ return new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
}
/**