/** @var callable[] */
private $replicationWaitCallbacks = [];
- /** @var mixed */
+ /** var int An identifier for this class instance */
+ private $id;
+ /** @var int|null Ticket used to delegate transaction ownership */
private $ticket;
/** @var string|bool String if a requested DBO_TRX transaction round is active */
private $trxRoundId = false;
$this->defaultGroup = $conf['defaultGroup'] ?? null;
$this->secret = $conf['secret'] ?? '';
+ $this->id = mt_rand();
$this->ticket = mt_rand();
}
}
$this->trxRoundId = $fname;
// Set DBO_TRX flags on all appropriate DBs
- $this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
+ $this->forEachLBCallMethod( 'beginMasterChanges', [ $fname, $this->id ] );
$this->trxRoundStage = self::ROUND_CURSORY;
}
// Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
do {
$count = 0; // number of callbacks executed this iteration
- $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$count ) {
- $count += $lb->finalizeMasterChanges();
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$count, $fname ) {
+ $count += $lb->finalizeMasterChanges( $fname, $this->id );
} );
} while ( $count > 0 );
$this->trxRoundId = false;
// Perform pre-commit checks, aborting on failure
- $this->forEachLBCallMethod( 'approveMasterChanges', [ $options ] );
+ $this->forEachLBCallMethod( 'approveMasterChanges', [ $options, $fname, $this->id ] );
// Log the DBs and methods involved in multi-DB transactions
$this->logIfMultiDbTransaction();
// Actually perform the commit on all master DB connections and revert DBO_TRX
- $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
+ $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname, $this->id ] );
// Run all post-commit callbacks in a separate step
$this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
$e = $this->executePostTransactionCallbacks();
$this->trxRoundStage = self::ROUND_ROLLING_BACK;
$this->trxRoundId = false;
// Actually perform the rollback on all master DB connections and revert DBO_TRX
- $this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname ] );
+ $this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname, $this->id ] );
// Run all post-commit callbacks in a separate step
$this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
$this->executePostTransactionCallbacks();
* @return Exception|null
*/
private function executePostTransactionCallbacks() {
+ $fname = __METHOD__;
// Run all post-commit callbacks until new ones stop getting added
$e = null; // first callback exception
do {
- $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) {
- $ex = $lb->runMasterTransactionIdleCallbacks();
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e, $fname ) {
+ $ex = $lb->runMasterTransactionIdleCallbacks( $fname, $this->id );
$e = $e ?: $ex;
} );
} while ( $this->hasMasterChanges() );
// Run all listener callbacks once
- $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) {
- $ex = $lb->runMasterTransactionListenerCallbacks();
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e, $fname ) {
+ $ex = $lb->runMasterTransactionListenerCallbacks( $fname, $this->id );
$e = $e ?: $ex;
} );
// time needed to wait on the next clusters.
$masterPositions = array_fill( 0, count( $lbs ), false );
foreach ( $lbs as $i => $lb ) {
- if ( $lb->getServerCount() <= 1 ) {
- // T29975 - Don't try to wait for replica DBs if there are none
- // Prevents permission error when getting master position
- continue;
- } elseif ( $opts['ifWritesSince']
- && $lb->lastMasterChangeTimestamp() < $opts['ifWritesSince']
+ if ( !$lb->hasStreamingReplicaServers() ) {
+ continue; // T29975: no replication; avoid getMasterPos() permissions errors
+ } elseif (
+ $opts['ifWritesSince'] &&
+ $lb->lastMasterChangeTimestamp() < $opts['ifWritesSince']
) {
continue; // no writes since the last wait
}
[
'ip' => $this->requestInfo['IPAddress'],
'agent' => $this->requestInfo['UserAgent'],
- 'clientId' => $this->requestInfo['ChronologyClientId']
+ 'clientId' => $this->requestInfo['ChronologyClientId'] ?: null
],
$this->requestInfo['ChronologyPositionIndex'],
$this->secret
) {
// Record all the master positions needed
$this->forEachLB( function ( ILoadBalancer $lb ) use ( $cp ) {
- $cp->shutdownLB( $lb );
+ $cp->storeSessionReplicationPosition( $lb );
} );
// Write them to the persistent stash. Try to do something useful by running $work
// while ChronologyProtector waits for the stash write to replicate to all DCs.
'chronologyCallback' => function ( ILoadBalancer $lb ) {
// Defer ChronologyProtector construction in case setRequestInfo() ends up
// being called later (but before the first connection attempt) (T192611)
- $this->getChronologyProtector()->initLB( $lb );
+ $this->getChronologyProtector()->applySessionReplicationPosition( $lb );
},
- 'roundStage' => $initStage
+ 'roundStage' => $initStage,
+ 'ownerId' => $this->id
];
}
*/
protected function initLoadBalancer( ILoadBalancer $lb ) {
if ( $this->trxRoundId !== false ) {
- $lb->beginMasterChanges( $this->trxRoundId ); // set DBO_TRX
+ $lb->beginMasterChanges( $this->trxRoundId, $this->id ); // set DBO_TRX
}
$lb->setTableAliases( $this->tableAliases );
public function appendShutdownCPIndexAsQuery( $url, $index ) {
$usedCluster = 0;
$this->forEachLB( function ( ILoadBalancer $lb ) use ( &$usedCluster ) {
- $usedCluster |= ( $lb->getServerCount() > 1 );
+ $usedCluster |= $lb->hasStreamingReplicaServers();
} );
if ( !$usedCluster ) {