namespace Wikimedia\Rdbms;
use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
use Wikimedia\ScopedCallback;
use BagOStuff;
use EmptyBagOStuff;
private $cliMode;
/** @var string Agent name for query profiling */
private $agent;
+ /** @var string Secret string for HMAC hashing */
+ private $secret;
/** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
private $tableAliases = [];
/** @var callable[] */
private $replicationWaitCallbacks = [];
- /** @var mixed */
+ /** var int An identifier for this class instance */
+ private $id;
+ /** @var int|null Ticket used to delegate transaction ownership */
private $ticket;
/** @var string|bool String if a requested DBO_TRX transaction round is active */
private $trxRoundId = false;
$this->wanCache = $conf['wanCache'] ?? WANObjectCache::newEmpty();
foreach ( self::$loggerFields as $key ) {
- $this->$key = $conf[$key] ?? new \Psr\Log\NullLogger();
+ $this->$key = $conf[$key] ?? new NullLogger();
}
$this->errorLogger = $conf['errorLogger'] ?? function ( Exception $e ) {
trigger_error( get_class( $e ) . ': ' . $e->getMessage(), E_USER_WARNING );
$this->hostname = $conf['hostname'] ?? gethostname();
$this->agent = $conf['agent'] ?? '';
$this->defaultGroup = $conf['defaultGroup'] ?? null;
+ $this->secret = $conf['secret'] ?? '';
+ $this->id = mt_rand();
$this->ticket = mt_rand();
}
}
public function flushReplicaSnapshots( $fname = __METHOD__ ) {
+ if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
+ $this->queryLogger->warning(
+ "$fname: transaction round '{$this->trxRoundId}' still running",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
+ }
$this->forEachLBCallMethod( 'flushReplicaSnapshots', [ $fname ] );
}
if ( $this->trxRoundId !== false ) {
throw new DBTransactionError(
null,
- "$fname: transaction round '{$this->trxRoundId}' already started."
+ "$fname: transaction round '{$this->trxRoundId}' already started"
);
}
$this->trxRoundId = $fname;
// Set DBO_TRX flags on all appropriate DBs
- $this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
+ $this->forEachLBCallMethod( 'beginMasterChanges', [ $fname, $this->id ] );
$this->trxRoundStage = self::ROUND_CURSORY;
}
if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
throw new DBTransactionError(
null,
- "$fname: transaction round '{$this->trxRoundId}' still running."
+ "$fname: transaction round '{$this->trxRoundId}' still running"
);
}
/** @noinspection PhpUnusedLocalVariableInspection */
// Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
do {
$count = 0; // number of callbacks executed this iteration
- $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$count ) {
- $count += $lb->finalizeMasterChanges();
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$count, $fname ) {
+ $count += $lb->finalizeMasterChanges( $fname, $this->id );
} );
} while ( $count > 0 );
$this->trxRoundId = false;
// Perform pre-commit checks, aborting on failure
- $this->forEachLBCallMethod( 'approveMasterChanges', [ $options ] );
+ $this->forEachLBCallMethod( 'approveMasterChanges', [ $options, $fname, $this->id ] );
// Log the DBs and methods involved in multi-DB transactions
$this->logIfMultiDbTransaction();
// Actually perform the commit on all master DB connections and revert DBO_TRX
- $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname ] );
+ $this->forEachLBCallMethod( 'commitMasterChanges', [ $fname, $this->id ] );
// Run all post-commit callbacks in a separate step
$this->trxRoundStage = self::ROUND_COMMIT_CALLBACKS;
$e = $this->executePostTransactionCallbacks();
$this->trxRoundStage = self::ROUND_ROLLING_BACK;
$this->trxRoundId = false;
// Actually perform the rollback on all master DB connections and revert DBO_TRX
- $this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname ] );
+ $this->forEachLBCallMethod( 'rollbackMasterChanges', [ $fname, $this->id ] );
// Run all post-commit callbacks in a separate step
$this->trxRoundStage = self::ROUND_ROLLBACK_CALLBACKS;
$this->executePostTransactionCallbacks();
* @return Exception|null
*/
private function executePostTransactionCallbacks() {
+ $fname = __METHOD__;
// Run all post-commit callbacks until new ones stop getting added
$e = null; // first callback exception
do {
- $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) {
- $ex = $lb->runMasterTransactionIdleCallbacks();
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e, $fname ) {
+ $ex = $lb->runMasterTransactionIdleCallbacks( $fname, $this->id );
$e = $e ?: $ex;
} );
} while ( $this->hasMasterChanges() );
// Run all listener callbacks once
- $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e ) {
- $ex = $lb->runMasterTransactionListenerCallbacks();
+ $this->forEachLB( function ( ILoadBalancer $lb ) use ( &$e, $fname ) {
+ $ex = $lb->runMasterTransactionListenerCallbacks( $fname, $this->id );
$e = $e ?: $ex;
} );
// time needed to wait on the next clusters.
$masterPositions = array_fill( 0, count( $lbs ), false );
foreach ( $lbs as $i => $lb ) {
- if ( $lb->getServerCount() <= 1 ) {
- // T29975 - Don't try to wait for replica DBs if there are none
- // Prevents permission error when getting master position
- continue;
- } elseif ( $opts['ifWritesSince']
- && $lb->lastMasterChangeTimestamp() < $opts['ifWritesSince']
+ if ( !$lb->hasStreamingReplicaServers() ) {
+ continue; // T29975: no replication; avoid getMasterPos() permissions errors
+ } elseif (
+ $opts['ifWritesSince'] &&
+ $lb->lastMasterChangeTimestamp() < $opts['ifWritesSince']
) {
continue; // no writes since the last wait
}
public function getEmptyTransactionTicket( $fname ) {
if ( $this->hasMasterChanges() ) {
- $this->queryLogger->error( __METHOD__ . ": $fname does not have outer scope.\n" .
- ( new RuntimeException() )->getTraceAsString() );
+ $this->queryLogger->error(
+ __METHOD__ . ": $fname does not have outer scope",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
return null;
}
final public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
if ( $ticket !== $this->ticket ) {
- $this->perfLogger->error( __METHOD__ . ": $fname does not have outer scope.\n" .
- ( new RuntimeException() )->getTraceAsString() );
+ $this->perfLogger->error(
+ __METHOD__ . ": $fname does not have outer scope",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
- return;
+ return false;
}
// The transaction owner and any caller with the empty transaction ticket can commit
// so that getEmptyTransactionTicket() callers don't risk seeing DBTransactionError.
if ( $this->trxRoundId !== false && $fname !== $this->trxRoundId ) {
- $this->queryLogger->info( "$fname: committing on behalf of {$this->trxRoundId}." );
+ $this->queryLogger->info( "$fname: committing on behalf of {$this->trxRoundId}" );
$fnameEffective = $this->trxRoundId;
} else {
$fnameEffective = $fname;
if ( $fnameEffective !== $fname ) {
$this->beginMasterChanges( $fnameEffective );
}
+
return $waitSucceeded;
}
[
'ip' => $this->requestInfo['IPAddress'],
'agent' => $this->requestInfo['UserAgent'],
- 'clientId' => $this->requestInfo['ChronologyClientId']
+ 'clientId' => $this->requestInfo['ChronologyClientId'] ?: null
],
- $this->requestInfo['ChronologyPositionIndex']
+ $this->requestInfo['ChronologyPositionIndex'],
+ $this->secret
);
$this->chronProt->setLogger( $this->replLogger );
} elseif ( $this->memStash instanceof EmptyBagOStuff ) {
// No where to store any DB positions and wait for them to appear
$this->chronProt->setEnabled( false );
- $this->replLogger->info( 'Cannot use ChronologyProtector with EmptyBagOStuff.' );
+ $this->replLogger->info( 'Cannot use ChronologyProtector with EmptyBagOStuff' );
}
- $this->replLogger->debug( __METHOD__ . ': using request info ' .
- json_encode( $this->requestInfo, JSON_PRETTY_PRINT ) );
+ $this->replLogger->debug(
+ __METHOD__ . ': request info ' .
+ json_encode( $this->requestInfo, JSON_PRETTY_PRINT )
+ );
return $this->chronProt;
}
) {
// Record all the master positions needed
$this->forEachLB( function ( ILoadBalancer $lb ) use ( $cp ) {
- $cp->shutdownLB( $lb );
+ $cp->storeSessionReplicationPosition( $lb );
} );
// Write them to the persistent stash. Try to do something useful by running $work
// while ChronologyProtector waits for the stash write to replicate to all DCs.
'chronologyCallback' => function ( ILoadBalancer $lb ) {
// Defer ChronologyProtector construction in case setRequestInfo() ends up
// being called later (but before the first connection attempt) (T192611)
- $this->getChronologyProtector()->initLB( $lb );
+ $this->getChronologyProtector()->applySessionReplicationPosition( $lb );
},
- 'roundStage' => $initStage
+ 'roundStage' => $initStage,
+ 'ownerId' => $this->id
];
}
*/
protected function initLoadBalancer( ILoadBalancer $lb ) {
if ( $this->trxRoundId !== false ) {
- $lb->beginMasterChanges( $this->trxRoundId ); // set DBO_TRX
+ $lb->beginMasterChanges( $this->trxRoundId, $this->id ); // set DBO_TRX
}
$lb->setTableAliases( $this->tableAliases );
public function appendShutdownCPIndexAsQuery( $url, $index ) {
$usedCluster = 0;
$this->forEachLB( function ( ILoadBalancer $lb ) use ( &$usedCluster ) {
- $usedCluster |= ( $lb->getServerCount() > 1 );
+ $usedCluster |= $lb->hasStreamingReplicaServers();
} );
if ( !$usedCluster ) {
public function setRequestInfo( array $info ) {
if ( $this->chronProt ) {
- throw new LogicException( 'ChronologyProtector already initialized.' );
+ throw new LogicException( 'ChronologyProtector already initialized' );
}
$this->requestInfo = $info + $this->requestInfo;