* Kind of like Hawking's [[Chronology Protection Agency]].
*/
class ChronologyProtector {
- /** @var array (DB master name => position) */
- protected $startupPositions = array();
+ /** @var BagOStuff */
+ protected $store;
- /** @var array (DB master name => position) */
- protected $shutdownPositions = array();
+ /** @var string Storage key name */
+ protected $key;
+ /** @var array Map of (ip: <IP>, agent: <user-agent>) */
+ protected $client;
+ /** @var bool Whether to no-op all method calls */
+ protected $enabled = true;
+ /** @var bool Whether to check and wait on positions */
+ protected $wait = true;
- /** @var bool Whether the session data was loaded */
+ /** @var bool Whether the client data was loaded */
protected $initialized = false;
+ /** @var DBMasterPos[] Map of (DB master name => position) */
+ protected $startupPositions = array();
+ /** @var DBMasterPos[] Map of (DB master name => position) */
+ protected $shutdownPositions = array();
+
+ /**
+ * @param BagOStuff $store
+ * @param array $client Map of (ip: <IP>, agent: <user-agent>)
+ * @since 1.27
+ */
+ public function __construct( BagOStuff $store, array $client ) {
+ $this->store = $store;
+ $this->client = $client;
+ $this->key = $store->makeGlobalKey(
+ 'ChronologyProtector',
+ md5( $client['ip'] . "\n" . $client['agent'] )
+ );
+ }
+
+ /**
+ * @param bool $enabled Whether to no-op all method calls
+ * @since 1.27
+ */
+ public function setEnabled( $enabled ) {
+ $this->enabled = $enabled;
+ }
+
+ /**
+ * @param bool $enabled Whether to check and wait on positions
+ * @since 1.27
+ */
+ public function setWaitEnabled( $enabled ) {
+ $this->wait = $enabled;
+ }
/**
* Initialise a LoadBalancer to give it appropriate chronology protection.
*
- * If the session has a previous master position recorded, this will try to
+ * If the stash has a previous master position recorded, this will try to
* make sure that the next query to a slave of that master will see changes up
* to that position by delaying execution. The delay may timeout and allow stale
* data if no non-lagged slaves are available.
* @return void
*/
public function initLB( LoadBalancer $lb ) {
- if ( $lb->getServerCount() <= 1 ) {
- return; // non-replicated setup
- }
- if ( !$this->initialized ) {
- $this->initialized = true;
- if ( isset( $_SESSION[__CLASS__] ) && is_array( $_SESSION[__CLASS__] ) ) {
- $this->startupPositions = $_SESSION[__CLASS__];
- }
+ if ( !$this->enabled || $lb->getServerCount() <= 1 ) {
+ return; // non-replicated setup or disabled
}
- $masterName = $lb->getServerName( 0 );
+
+ $this->initPositions();
+
+ $masterName = $lb->getServerName( $lb->getWriterIndex() );
if ( !empty( $this->startupPositions[$masterName] ) ) {
$info = $lb->parentInfo();
$pos = $this->startupPositions[$masterName];
- wfDebug( __METHOD__ . ": LB '" . $info['id'] . "' waiting for master pos $pos\n" );
+ wfDebugLog( 'replication', __METHOD__ .
+ ": LB '" . $info['id'] . "' waiting for master pos $pos\n" );
$lb->waitFor( $pos );
}
}
* @return void
*/
public function shutdownLB( LoadBalancer $lb ) {
- if ( session_id() == '' || $lb->getServerCount() <= 1 ) {
- return; // don't start a session; don't bother with non-replicated setups
- }
- $masterName = $lb->getServerName( 0 );
- if ( isset( $this->shutdownPositions[$masterName] ) ) {
- return; // already done
+ if ( !$this->enabled || $lb->getServerCount() <= 1 ) {
+ return; // non-replicated setup or disabled
}
- // Only save the position if writes have been done on the connection
- $db = $lb->getAnyOpenConnection( 0 );
+
$info = $lb->parentInfo();
+ $masterName = $lb->getServerName( $lb->getWriterIndex() );
+
+ // Only save the position if writes have been done on the connection
+ $db = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
if ( !$db || !$db->doneWrites() ) {
- wfDebug( __METHOD__ . ": LB {$info['id']}, no writes done\n" );
+ wfDebugLog( 'replication', __METHOD__ . ": LB {$info['id']}, no writes done\n" );
- return;
+ return; // nothing to do
}
+
$pos = $db->getMasterPos();
- wfDebug( __METHOD__ . ": LB {$info['id']} has master pos $pos\n" );
+ wfDebugLog( 'replication', __METHOD__ . ": LB {$info['id']} has master pos $pos\n" );
$this->shutdownPositions[$masterName] = $pos;
}
* Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
* May commit chronology data to persistent storage.
*
- * @return void
+ * @return array Empty on success; returns the (db name => position) map on failure
*/
public function shutdown() {
- if ( session_id() != '' && count( $this->shutdownPositions ) ) {
- wfDebug( __METHOD__ . ": saving master pos for " .
- count( $this->shutdownPositions ) . " master(s)\n" );
- $_SESSION[__CLASS__] = $this->shutdownPositions;
+ if ( !$this->enabled || !count( $this->shutdownPositions ) ) {
+ return true; // nothing to save
+ }
+
+ wfDebugLog( 'replication',
+ __METHOD__ . ": saving master pos for " .
+ implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
+ );
+
+ $shutdownPositions = $this->shutdownPositions;
+ $ok = $this->store->merge(
+ $this->key,
+ function ( $store, $key, $curValue ) use ( $shutdownPositions ) {
+ /** @var $curPositions DBMasterPos[] */
+ if ( $curValue === false ) {
+ $curPositions = $shutdownPositions;
+ } else {
+ $curPositions = $curValue['positions'];
+ // Use the newest positions for each DB master
+ foreach ( $shutdownPositions as $db => $pos ) {
+ if ( !isset( $curPositions[$db] )
+ || $pos->asOfTime() > $curPositions[$db]->asOfTime()
+ ) {
+ $curPositions[$db] = $pos;
+ }
+ }
+ }
+
+ return array( 'positions' => $curPositions );
+ },
+ BagOStuff::TTL_MINUTE,
+ 10,
+ BagOStuff::WRITE_SYNC // visible in all datacenters
+ );
+
+ if ( !$ok ) {
+ // Raced out too many times or stash is down
+ wfDebugLog( 'replication',
+ __METHOD__ . ": failed to save master pos for " .
+ implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
+ );
+
+ return $this->shutdownPositions;
+ }
+
+ return array();
+ }
+
+ /**
+ * Load in previous master positions for the client
+ */
+ protected function initPositions() {
+ if ( $this->initialized ) {
+ return;
+ }
+
+ $this->initialized = true;
+ if ( $this->wait ) {
+ $data = $this->store->get( $this->key );
+ $this->startupPositions = $data ? $data['positions'] : array();
+
+ wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (read)\n" );
+ } else {
+ $this->startupPositions = array();
+
+ wfDebugLog( 'replication', __METHOD__ . ": key is {$this->key} (unread)\n" );
}
}
}