* @return void
*/
protected function doBatchPush( array $jobs, $flags ) {
- DeferredUpdates::addUpdate(
- new AutoCommitUpdate(
- $this->getMasterDB(),
- __METHOD__,
- function ( IDatabase $dbw, $fname ) use ( $jobs, $flags ) {
- $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
- }
- ),
- DeferredUpdates::PRESEND
+ $dbw = $this->getMasterDB();
+ // In general, there will be two cases here:
+ // a) sqlite; DB connection is probably a regular round-aware handle.
+ // If the connection is busy with a transaction, then defer the job writes
+ // until right before the main round commit step. Any errors that bubble
+ // up will rollback the main commit round.
+ // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
+ // No transaction is active nor will be started by writes, so enqueue the jobs
+ // now so that any errors will show up immediately as the interface expects. Any
+ // errors that bubble up will rollback the main commit round.
+ $fname = __METHOD__;
+ $dbw->onTransactionPreCommitOrIdle(
+ function () use ( $dbw, $jobs, $flags, $fname ) {
+ $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
+ },
+ $fname
);
}
'job_title' => $job->getTitle()->getDBkey(),
'job_params' => self::makeBlob( $job->getParams() ),
// Additional job metadata
- 'job_id' => $dbw->nextSequenceValue( 'job_job_id_seq' ),
'job_timestamp' => $dbw->timestamp(),
'job_sha1' => Wikimedia\base_convert(
sha1( serialize( $job->getDeduplicationInfo() ) ),
? $lbFactory->getExternalLB( $this->cluster )
: $lbFactory->getMainLB( $this->wiki );
- return $lb->getConnectionRef( $index, [], $this->wiki );
+ return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
+ // Keep a separate connection to avoid contention and deadlocks;
+ // However, SQLite has the opposite behavior due to DB-level locking.
+ ? $lb->getConnectionRef( $index, [], $this->wiki, $lb::CONN_TRX_AUTOCOMMIT )
+ // Jobs insertion will be defered until the PRESEND stage to reduce contention.
+ : $lb->getConnectionRef( $index, [], $this->wiki );
}
/**