/** @var callable[] */
protected $replicationWaitCallbacks = [];
- const SHUTDOWN_NO_CHRONPROT = 1; // don't save ChronologyProtector positions (for async code)
+ const SHUTDOWN_NO_CHRONPROT = 0; // don't save DB positions at all
+ const SHUTDOWN_CHRONPROT_ASYNC = 1; // save DB positions, but don't wait on remote DCs
+ const SHUTDOWN_CHRONPROT_SYNC = 2; // save DB positions, waiting on all DCs
/**
* Construct a factory based on a configuration array (typically from $wgLBFactoryConf)
* @see LoadBalancer::disable()
*/
public function destroy() {
- $this->shutdown();
+ $this->shutdown( self::SHUTDOWN_NO_CHRONPROT );
$this->forEachLBCallMethod( 'disable' );
}
* @param bool|string $wiki Wiki ID, or false for the current wiki
* @return LoadBalancer
*/
- abstract public function &getExternalLB( $cluster, $wiki = false );
+ abstract public function getExternalLB( $cluster, $wiki = false );
/**
* Execute a function for each tracked load balancer
/**
* Prepare all tracked load balancers for shutdown
- * @param integer $flags Supports SHUTDOWN_* flags
+ * @param integer $mode One of the class SHUTDOWN_* constants
+ * @param callable|null $workCallback Work to mask ChronologyProtector writes
*/
- public function shutdown( $flags = 0 ) {
- if ( !( $flags & self::SHUTDOWN_NO_CHRONPROT ) ) {
- $this->shutdownChronologyProtector( $this->chronProt );
+ public function shutdown(
+ $mode = self::SHUTDOWN_CHRONPROT_SYNC, callable $workCallback = null
+ ) {
+ if ( $mode === self::SHUTDOWN_CHRONPROT_SYNC ) {
+ $this->shutdownChronologyProtector( $this->chronProt, $workCallback, 'sync' );
+ } elseif ( $mode === self::SHUTDOWN_CHRONPROT_ASYNC ) {
+ $this->shutdownChronologyProtector( $this->chronProt, null, 'async' );
}
+
$this->commitMasterChanges( __METHOD__ ); // sanity
}
);
}
+ /**
+ * Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot
+ *
+ * @param string $fname Caller name
+ * @since 1.28
+ */
+ public function flushReplicaSnapshots( $fname = __METHOD__ ) {
+ $this->forEachLBCallMethod( 'flushReplicaSnapshots', [ $fname ] );
+ }
+
+ /**
+ * Commit on all connections. Done for two reasons:
+ * 1. To commit changes to the masters.
+ * 2. To release the snapshot on all connections, master and replica DB.
+ * @param string $fname Caller name
+ * @param array $options Options map:
+ * - maxWriteDuration: abort if more than this much time was spent in write queries
+ */
+ public function commitAll( $fname = __METHOD__, array $options = [] ) {
+ $this->commitMasterChanges( $fname, $options );
+ $this->forEachLBCallMethod( 'commitAll', [ $fname ] );
+ }
+
/**
* Flush any master transaction snapshots and set DBO_TRX (if DBO_DEFAULT is set)
*
* - commitMasterChanges()
* - rollbackMasterChanges()
* - commitAll()
+ *
* This allows for custom transaction rounds from any outer transaction scope.
*
* @param string $fname
if ( $this->trxRoundId !== false ) {
throw new DBTransactionError(
null,
- "Transaction round '{$this->trxRoundId}' already started."
+ "$fname: transaction round '{$this->trxRoundId}' already started."
);
}
$this->trxRoundId = $fname;
$this->forEachLBCallMethod( 'beginMasterChanges', [ $fname ] );
}
- /**
- * Commit all replica DB transactions so as to flush any REPEATABLE-READ or SSI snapshot
- *
- * @param string $fname Caller name
- * @since 1.28
- */
- public function flushReplicaSnapshots( $fname = __METHOD__ ) {
- $this->forEachLBCallMethod( 'flushReplicaSnapshots', [ $fname ] );
- }
-
- /**
- * Commit on all connections. Done for two reasons:
- * 1. To commit changes to the masters.
- * 2. To release the snapshot on all connections, master and replica DB.
- * @param string $fname Caller name
- * @param array $options Options map:
- * - maxWriteDuration: abort if more than this much time was spent in write queries
- */
- public function commitAll( $fname = __METHOD__, array $options = [] ) {
- $this->commitMasterChanges( $fname, $options );
- $this->forEachLBCallMethod( 'commitAll', [ $fname ] );
- }
-
/**
* Commit changes on all master connections
* @param string $fname Caller name
* @throws Exception
*/
public function commitMasterChanges( $fname = __METHOD__, array $options = [] ) {
+ if ( $this->trxRoundId !== false && $this->trxRoundId !== $fname ) {
+ throw new DBTransactionError(
+ null,
+ "$fname: transaction round '{$this->trxRoundId}' still running."
+ );
+ }
// Run pre-commit callbacks and suppress post-commit callbacks, aborting on failure
$this->forEachLBCallMethod( 'finalizeMasterChanges' );
$this->trxRoundId = false;
/**
* Detemine if any lagged replica DB connection was used
- * @since 1.27
* @return bool
+ * @since 1.28
*/
- public function laggedSlaveUsed() {
+ public function laggedReplicaUsed() {
$ret = false;
$this->forEachLB( function ( LoadBalancer $lb ) use ( &$ret ) {
- $ret = $ret || $lb->laggedSlaveUsed();
+ $ret = $ret || $lb->laggedReplicaUsed();
} );
return $ret;
}
+ /**
+ * @return bool
+ * @since 1.27
+ * @deprecated Since 1.28; use laggedReplicaUsed()
+ */
+ public function laggedSlaveUsed() {
+ return $this->laggedReplicaUsed();
+ }
+
/**
* Determine if any master connection has pending/written changes from this request
+ * @param float $age How many seconds ago is "recent" [defaults to LB lag wait timeout]
* @return bool
* @since 1.27
*/
- public function hasOrMadeRecentMasterChanges() {
+ public function hasOrMadeRecentMasterChanges( $age = null ) {
$ret = false;
- $this->forEachLB( function ( LoadBalancer $lb ) use ( &$ret ) {
- $ret = $ret || $lb->hasOrMadeRecentMasterChanges();
+ $this->forEachLB( function ( LoadBalancer $lb ) use ( $age, &$ret ) {
+ $ret = $ret || $lb->hasOrMadeRecentMasterChanges( $age );
} );
return $ret;
}
* This will commit and wait unless $ticket indicates it is unsafe to do so
*
* @param string $fname Caller name (e.g. __METHOD__)
- * @param mixed $ticket Result of getOuterTransactionScopeTicket()
+ * @param mixed $ticket Result of getEmptyTransactionTicket()
* @param array $opts Options to waitForReplication()
* @throws DBReplicationWaitError
* @since 1.28
return;
}
- $this->commitMasterChanges( $fname );
+ // The transaction owner and any caller with the empty transaction ticket can commit
+ // so that getEmptyTransactionTicket() callers don't risk seeing DBTransactionError.
+ if ( $this->trxRoundId !== false && $fname !== $this->trxRoundId ) {
+ $this->trxLogger->info( "$fname: committing on behalf of {$this->trxRoundId}." );
+ $fnameEffective = $this->trxRoundId;
+ } else {
+ $fnameEffective = $fname;
+ }
+
+ $this->commitMasterChanges( $fnameEffective );
$this->waitForReplication( $opts );
+ // If a nested caller committed on behalf of $fname, start another empty $fname
+ // transaction, leaving the caller with the same empty transaction state as before.
+ if ( $fnameEffective !== $fname ) {
+ $this->beginMasterChanges( $fnameEffective );
+ }
+ }
+
+ /**
+ * @param string $dbName DB master name (e.g. "db1052")
+ * @return float|bool UNIX timestamp when client last touched the DB or false if not recent
+ * @since 1.28
+ */
+ public function getChronologyProtectorTouched( $dbName ) {
+ return $this->chronProt->getTouched( $dbName );
}
/**
ObjectCache::getMainStashInstance(),
[
'ip' => $request->getIP(),
- 'agent' => $request->getHeader( 'User-Agent' )
- ]
+ 'agent' => $request->getHeader( 'User-Agent' ),
+ ],
+ $request->getFloat( 'cpPosTime', null )
);
if ( PHP_SAPI === 'cli' ) {
$chronProt->setEnabled( false );
}
/**
+ * Get and record all of the staged DB positions into persistent memory storage
+ *
* @param ChronologyProtector $cp
+ * @param callable|null $workCallback Work to do instead of waiting on syncing positions
+ * @param string $mode One of (sync, async); whether to wait on remote datacenters
*/
- protected function shutdownChronologyProtector( ChronologyProtector $cp ) {
- // Get all the master positions needed
+ protected function shutdownChronologyProtector(
+ ChronologyProtector $cp, $workCallback, $mode
+ ) {
+ // Record all the master positions needed
$this->forEachLB( function ( LoadBalancer $lb ) use ( $cp ) {
$cp->shutdownLB( $lb );
} );
- // Write them to the stash
- $unsavedPositions = $cp->shutdown();
+ // Write them to the persistent stash. Try to do something useful by running $work
+ // while ChronologyProtector waits for the stash write to replicate to all DCs.
+ $unsavedPositions = $cp->shutdown( $workCallback, $mode );
+ if ( $unsavedPositions && $workCallback ) {
+ // Invoke callback in case it did not cache the result yet
+ $workCallback(); // work now to block for less time in waitForAll()
+ }
// If the positions failed to write to the stash, at least wait on local datacenter
// replica DBs to catch up before responding. Even if there are several DCs, this increases
// the chance that the user will see their own changes immediately afterwards. As long
}
}
+ /**
+ * Append ?cpPosTime parameter to a URL for ChronologyProtector purposes if needed
+ *
+ * Note that unlike cookies, this works accross domains
+ *
+ * @param string $url
+ * @param float $time UNIX timestamp just before shutdown() was called
+ * @return string
+ * @since 1.28
+ */
+ public function appendPreShutdownTimeAsQuery( $url, $time ) {
+ $usedCluster = 0;
+ $this->forEachLB( function ( LoadBalancer $lb ) use ( &$usedCluster ) {
+ $usedCluster |= ( $lb->getServerCount() > 1 );
+ } );
+
+ if ( !$usedCluster ) {
+ return $url; // no master/replica clusters touched
+ }
+
+ return wfAppendQuery( $url, [ 'cpPosTime' => $time ] );
+ }
+
/**
* Close all open database connections on all open load balancers.
* @since 1.28
public function closeAll() {
$this->forEachLBCallMethod( 'closeAll', [] );
}
-
}