* @file
*/
use Wikimedia\Rdbms\IDatabase;
-use Wikimedia\Rdbms\DBConnRef;
+use Wikimedia\Rdbms\Database;
use Wikimedia\Rdbms\DBConnectionError;
use Wikimedia\Rdbms\DBError;
use MediaWiki\MediaWikiServices;
/** @var WANObjectCache */
protected $cache;
+ /** @var IDatabase|DBError|null */
+ protected $conn;
- /** @var bool|string Name of an external DB cluster. False if not set */
- protected $cluster = false;
+ /** @var array|null Server configuration array */
+ protected $server;
+ /** @var string|null Name of an external DB cluster or null for the local DB cluster */
+ protected $cluster;
/**
* Additional parameters include:
+ * - server : Server configuration array for Database::factory. Overrides "cluster".
* - cluster : The name of an external cluster registered via LBFactory.
* If not specified, the primary DB cluster for the wiki will be used.
* This can be overridden with a custom cluster so that DB handles will
* be retrieved via LBFactory::getExternalLB() and getConnection().
+ * - wanCache : An instance of WANObjectCache to use for caching.
* @param array $params
*/
protected function __construct( array $params ) {
parent::__construct( $params );
- $this->cluster = $params['cluster'] ?? false;
- $this->cache = ObjectCache::getMainWANInstance();
+ if ( isset( $params['server'] ) ) {
+ $this->server = $params['server'];
+ } elseif ( isset( $params['cluster'] ) && is_string( $params['cluster'] ) ) {
+ $this->cluster = $params['cluster'];
+ }
+
+ $this->cache = $params['wanCache'] ?? WANObjectCache::newEmpty();
}
protected function supportedOrders() {
*/
protected function doIsEmpty() {
$dbr = $this->getReplicaDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbr );
try {
$found = $dbr->selectField( // unclaimed job
'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
return $size;
}
+ $dbr = $this->getReplicaDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbr );
try {
- $dbr = $this->getReplicaDB();
$size = (int)$dbr->selectField( 'job', 'COUNT(*)',
[ 'job_cmd' => $this->type, 'job_token' => '' ],
__METHOD__
}
$dbr = $this->getReplicaDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbr );
try {
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
[ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
}
$dbr = $this->getReplicaDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbr );
try {
$count = (int)$dbr->selectField( 'job', 'COUNT(*)',
[
*/
protected function doBatchPush( array $jobs, $flags ) {
$dbw = $this->getMasterDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbw );
// In general, there will be two cases here:
// a) sqlite; DB connection is probably a regular round-aware handle.
// If the connection is busy with a transaction, then defer the job writes
foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
$dbw->insert( 'job', $rowBatch, $method );
}
- JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
- JobQueue::incrStats( 'dupe_inserts', $this->type,
+ $this->incrStats( 'inserts', $this->type, count( $rows ) );
+ $this->incrStats( 'dupe_inserts', $this->type,
count( $rowSet ) + count( $rowList ) - count( $rows )
);
} catch ( DBError $e ) {
*/
protected function doPop() {
$dbw = $this->getMasterDB();
- try {
- $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
- $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
- $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
- $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
- } );
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbw );
+ $job = false; // job popped off
+ try {
$uuid = wfRandomString( 32 ); // pop attempt
- $job = false; // job popped off
do { // retry when our row is invalid or deleted as a duplicate
// Try to reserve a row in the DB...
if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
if ( !$row ) {
break; // nothing to do
}
- JobQueue::incrStats( 'pops', $this->type );
+ $this->incrStats( 'pops', $this->type );
// Get the job object from the row...
$title = Title::makeTitle( $row->job_namespace, $row->job_title );
$job = Job::factory( $row->job_cmd, $title,
self::extractBlob( $row->job_params ) );
- $job->metadata['id'] = $row->job_id;
- $job->metadata['timestamp'] = $row->job_timestamp;
+ $job->setMetadata( 'id', $row->job_id );
+ $job->setMetadata( 'timestamp', $row->job_timestamp );
break; // done
} while ( true );
*/
protected function claimRandom( $uuid, $rand, $gte ) {
$dbw = $this->getMasterDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbw );
// Check cache to see if the queue has <= OFFSET items
$tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
*/
protected function claimOldest( $uuid ) {
$dbw = $this->getMasterDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbw );
$row = false; // the row acquired
do {
* @throws MWException
*/
protected function doAck( Job $job ) {
- if ( !isset( $job->metadata['id'] ) ) {
+ $id = $job->getMetadata( 'id' );
+ if ( $id === null ) {
throw new MWException( "Job of type '{$job->getType()}' has no ID." );
}
$dbw = $this->getMasterDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbw );
try {
- $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
- $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
- $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
- $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
- } );
-
// Delete a row with a single DELETE without holding row locks over RTTs...
- $dbw->delete( 'job',
- [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
+ $dbw->delete(
+ 'job',
+ [ 'job_cmd' => $this->type, 'job_id' => $id ],
+ __METHOD__
+ );
- JobQueue::incrStats( 'acks', $this->type );
+ $this->incrStats( 'acks', $this->type );
} catch ( DBError $e ) {
$this->throwDBException( $e );
}
// maintained. Having only the de-duplication registration succeed would cause
// jobs to become no-ops without any actual jobs that made them redundant.
$dbw = $this->getMasterDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbw );
+
$cache = $this->dupCache;
$dbw->onTransactionCommitOrIdle(
function () use ( $cache, $params, $key ) {
*/
protected function doDelete() {
$dbw = $this->getMasterDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbw );
try {
$dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
} catch ( DBError $e ) {
* @return void
*/
protected function doWaitForBackups() {
+ if ( $this->server ) {
+ return; // not using LBFactory instance
+ }
+
$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
- $lbFactory->waitForReplication(
- [ 'domain' => $this->domain, 'cluster' => $this->cluster ] );
+ $lbFactory->waitForReplication( [
+ 'domain' => $this->domain,
+ 'cluster' => is_string( $this->cluster ) ? $this->cluster : false
+ ] );
}
/**
*/
protected function getJobIterator( array $conds ) {
$dbr = $this->getReplicaDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbr );
try {
return new MappedIterator(
$dbr->select( 'job', self::selectFields(), $conds ),
Title::makeTitle( $row->job_namespace, $row->job_title ),
strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
);
- $job->metadata['id'] = $row->job_id;
- $job->metadata['timestamp'] = $row->job_timestamp;
+ $job->setMetadata( 'id', $row->job_id );
+ $job->setMetadata( 'timestamp', $row->job_timestamp );
return $job;
}
}
public function getCoalesceLocationInternal() {
- return $this->cluster
+ if ( $this->server ) {
+ return null; // not using the LBFactory instance
+ }
+
+ return is_string( $this->cluster )
? "DBCluster:{$this->cluster}:{$this->domain}"
: "LBFactory:{$this->domain}";
}
protected function doGetSiblingQueuesWithJobs( array $types ) {
$dbr = $this->getReplicaDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbr );
// @note: this does not check whether the jobs are claimed or not.
// This is useful so JobQueueGroup::pop() also sees queues that only
// have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
protected function doGetSiblingQueueSizes( array $types ) {
$dbr = $this->getReplicaDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbr );
+
$res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
[ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
$now = time();
$count = 0; // affected rows
$dbw = $this->getMasterDB();
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scope = $this->getScopedNoTrxFlag( $dbw );
try {
if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
);
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'recycles', $this->type, $affected );
- $this->aggr->notifyQueueNonEmpty( $this->domain, $this->type );
+ $this->incrStats( 'recycles', $this->type, $affected );
}
}
$dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
$affected = $dbw->affectedRows();
$count += $affected;
- JobQueue::incrStats( 'abandons', $this->type, $affected );
+ $this->incrStats( 'abandons', $this->type, $affected );
}
$dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
/**
* @throws JobQueueConnectionError
- * @return DBConnRef
+ * @return IDatabase
*/
protected function getReplicaDB() {
try {
/**
* @throws JobQueueConnectionError
- * @return DBConnRef
+ * @return IDatabase
*/
protected function getMasterDB() {
try {
/**
* @param int $index (DB_REPLICA/DB_MASTER)
- * @return DBConnRef
+ * @return IDatabase
*/
protected function getDB( $index ) {
- $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
- $lb = ( $this->cluster !== false )
- ? $lbFactory->getExternalLB( $this->cluster )
- : $lbFactory->getMainLB( $this->domain );
-
- return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
- // Keep a separate connection to avoid contention and deadlocks;
- // However, SQLite has the opposite behavior due to DB-level locking.
- ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
- // Jobs insertion will be defered until the PRESEND stage to reduce contention.
- : $lb->getConnectionRef( $index, [], $this->domain );
+ if ( $this->server ) {
+ if ( $this->conn instanceof IDatabase ) {
+ return $this->conn;
+ } elseif ( $this->conn instanceof DBError ) {
+ throw $this->conn;
+ }
+
+ try {
+ $this->conn = Database::factory( $this->server['type'], $this->server );
+ } catch ( DBError $e ) {
+ $this->conn = $e;
+ throw $e;
+ }
+
+ return $this->conn;
+ } else {
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $lb = is_string( $this->cluster )
+ ? $lbFactory->getExternalLB( $this->cluster )
+ : $lbFactory->getMainLB( $this->domain );
+
+ return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
+ // Keep a separate connection to avoid contention and deadlocks;
+ // However, SQLite has the opposite behavior due to DB-level locking.
+ ? $lb->getConnectionRef( $index, [], $this->domain, $lb::CONN_TRX_AUTOCOMMIT )
+ // Jobs insertion will be defered until the PRESEND stage to reduce contention.
+ : $lb->getConnectionRef( $index, [], $this->domain );
+ }
+ }
+
+ /**
+ * @param IDatabase $db
+ * @return ScopedCallback
+ */
+ private function getScopedNoTrxFlag( IDatabase $db ) {
+ $autoTrx = $db->getFlag( DBO_TRX ); // get current setting
+ $db->clearFlag( DBO_TRX ); // make each query its own transaction
+
+ return new ScopedCallback( function () use ( $db, $autoTrx ) {
+ if ( $autoTrx ) {
+ $db->setFlag( DBO_TRX ); // restore old setting
+ }
+ } );
}
/**