Merge "Make DBAccessBase use DBConnRef, rename $wiki, and hide getLoadBalancer()"
[lhc/web/wiklou.git] / includes / libs / rdbms / ChronologyProtector.php
1 <?php
2 /**
3 * Generator of database load balancing objects.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Database
22 */
23
24 namespace Wikimedia\Rdbms;
25
26 use Psr\Log\LoggerAwareInterface;
27 use Psr\Log\LoggerInterface;
28 use Psr\Log\NullLogger;
29 use Wikimedia\WaitConditionLoop;
30 use BagOStuff;
31
32 /**
33 * Helper class for mitigating DB replication lag in order to provide "session consistency"
34 *
35 * This helps to ensure a consistent ordering of events as seen by an client
36 *
37 * Kind of like Hawking's [[Chronology Protection Agency]].
38 */
39 class ChronologyProtector implements LoggerAwareInterface {
40 /** @var BagOStuff */
41 protected $store;
42 /** @var LoggerInterface */
43 protected $logger;
44
45 /** @var string Storage key name */
46 protected $key;
47 /** @var string Hash of client parameters */
48 protected $clientId;
49 /** @var string[] Map of client information fields for logging */
50 protected $clientLogInfo;
51 /** @var int|null Expected minimum index of the last write to the position store */
52 protected $waitForPosIndex;
53 /** @var int Max seconds to wait on positions to appear */
54 protected $waitForPosStoreTimeout = self::POS_STORE_WAIT_TIMEOUT;
55 /** @var bool Whether to no-op all method calls */
56 protected $enabled = true;
57 /** @var bool Whether to check and wait on positions */
58 protected $wait = true;
59
60 /** @var bool Whether the client data was loaded */
61 protected $initialized = false;
62 /** @var DBMasterPos[] Map of (DB master name => position) */
63 protected $startupPositions = [];
64 /** @var DBMasterPos[] Map of (DB master name => position) */
65 protected $shutdownPositions = [];
66 /** @var float[] Map of (DB master name => 1) */
67 protected $shutdownTouchDBs = [];
68
69 /** @var int Seconds to store positions */
70 const POSITION_TTL = 60;
71 /** @var int Seconds to store position write index cookies (safely less than POSITION_TTL) */
72 const POSITION_COOKIE_TTL = 10;
73 /** @var int Max time to wait for positions to appear */
74 const POS_STORE_WAIT_TIMEOUT = 5;
75
76 /**
77 * @param BagOStuff $store
78 * @param array $client Map of (ip: <IP>, agent: <user-agent> [, clientId: <hash>] )
79 * @param int|null $posIndex Write counter index
80 * @param string $secret Secret string for HMAC hashing [optional]
81 * @since 1.27
82 */
83 public function __construct( BagOStuff $store, array $client, $posIndex, $secret = '' ) {
84 $this->store = $store;
85 if ( isset( $client['clientId'] ) ) {
86 $this->clientId = $client['clientId'];
87 } else {
88 $this->clientId = ( $secret != '' )
89 ? hash_hmac( 'md5', $client['ip'] . "\n" . $client['agent'], $secret )
90 : md5( $client['ip'] . "\n" . $client['agent'] );
91 }
92 $this->key = $store->makeGlobalKey( __CLASS__, $this->clientId, 'v2' );
93 $this->waitForPosIndex = $posIndex;
94
95 $this->clientLogInfo = [
96 'clientIP' => $client['ip'],
97 'clientAgent' => $client['agent'],
98 'clientId' => $client['clientId'] ?? null
99 ];
100
101 $this->logger = new NullLogger();
102 }
103
104 public function setLogger( LoggerInterface $logger ) {
105 $this->logger = $logger;
106 }
107
108 /**
109 * @return string Client ID hash
110 * @since 1.32
111 */
112 public function getClientId() {
113 return $this->clientId;
114 }
115
116 /**
117 * @param bool $enabled Whether to no-op all method calls
118 * @since 1.27
119 */
120 public function setEnabled( $enabled ) {
121 $this->enabled = $enabled;
122 }
123
124 /**
125 * @param bool $enabled Whether to check and wait on positions
126 * @since 1.27
127 */
128 public function setWaitEnabled( $enabled ) {
129 $this->wait = $enabled;
130 }
131
132 /**
133 * Apply the "session consistency" DB replication position to a new ILoadBalancer
134 *
135 * If the stash has a previous master position recorded, this will try to make
136 * sure that the next query to a replica DB of that master will see changes up
137 * to that position by delaying execution. The delay may timeout and allow stale
138 * data if no non-lagged replica DBs are available.
139 *
140 * This method should only be called from LBFactory.
141 *
142 * @param ILoadBalancer $lb
143 * @return void
144 */
145 public function applySessionReplicationPosition( ILoadBalancer $lb ) {
146 if ( !$this->enabled ) {
147 return; // disabled
148 }
149
150 $masterName = $lb->getServerName( $lb->getWriterIndex() );
151 $startupPositions = $this->getStartupMasterPositions();
152
153 $pos = $startupPositions[$masterName] ?? null;
154 if ( $pos instanceof DBMasterPos ) {
155 $this->logger->debug( __METHOD__ . ": pos for DB '$masterName' set to '$pos'\n" );
156 $lb->waitFor( $pos );
157 }
158 }
159
160 /**
161 * Save the "session consistency" DB replication position for an end-of-life ILoadBalancer
162 *
163 * This saves the replication position of the master DB if this request made writes to it.
164 *
165 * This method should only be called from LBFactory.
166 *
167 * @param ILoadBalancer $lb
168 * @return void
169 */
170 public function storeSessionReplicationPosition( ILoadBalancer $lb ) {
171 if ( !$this->enabled ) {
172 return; // disabled
173 } elseif ( !$lb->hasOrMadeRecentMasterChanges( INF ) ) {
174 // Only save the position if writes have been done on the connection
175 return;
176 }
177
178 $masterName = $lb->getServerName( $lb->getWriterIndex() );
179 if ( $lb->hasStreamingReplicaServers() ) {
180 $pos = $lb->getReplicaResumePos();
181 if ( $pos ) {
182 $this->logger->debug( __METHOD__ . ": LB for '$masterName' has pos $pos\n" );
183 $this->shutdownPositions[$masterName] = $pos;
184 }
185 } else {
186 $this->logger->debug( __METHOD__ . ": DB '$masterName' touched\n" );
187 }
188 $this->shutdownTouchDBs[$masterName] = 1;
189 }
190
191 /**
192 * Notify the ChronologyProtector that the LBFactory is done calling shutdownLB() for now.
193 * May commit chronology data to persistent storage.
194 *
195 * @param callable|null $workCallback Work to do instead of waiting on syncing positions
196 * @param string $mode One of (sync, async); whether to wait on remote datacenters
197 * @param int|null &$cpIndex DB position key write counter; incremented on update
198 * @return DBMasterPos[] Empty on success; returns the (db name => position) map on failure
199 */
200 public function shutdown( callable $workCallback = null, $mode = 'sync', &$cpIndex = null ) {
201 if ( !$this->enabled ) {
202 return [];
203 }
204
205 $store = $this->store;
206 // Some callers might want to know if a user recently touched a DB.
207 // These writes do not need to block on all datacenters receiving them.
208 foreach ( $this->shutdownTouchDBs as $dbName => $unused ) {
209 $store->set(
210 $this->getTouchedKey( $this->store, $dbName ),
211 microtime( true ),
212 $store::TTL_DAY
213 );
214 }
215
216 if ( $this->shutdownPositions === [] ) {
217 $this->logger->debug( __METHOD__ . ": no master positions to save\n" );
218
219 return []; // nothing to save
220 }
221
222 $this->logger->debug(
223 __METHOD__ . ": saving master pos for " .
224 implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
225 );
226
227 // CP-protected writes should overwhelmingly go to the master datacenter, so merge the
228 // positions with a DC-local lock, a DC-local get(), and an all-DC set() with WRITE_SYNC.
229 // If set() returns success, then any get() should be able to see the new positions.
230 if ( $store->lock( $this->key, 3 ) ) {
231 if ( $workCallback ) {
232 // Let the store run the work before blocking on a replication sync barrier.
233 // If replication caught up while the work finished, the barrier will be fast.
234 $store->addBusyCallback( $workCallback );
235 }
236 $ok = $store->set(
237 $this->key,
238 $this->mergePositions(
239 $store->get( $this->key ),
240 $this->shutdownPositions,
241 $cpIndex
242 ),
243 self::POSITION_TTL,
244 ( $mode === 'sync' ) ? $store::WRITE_SYNC : 0
245 );
246 $store->unlock( $this->key );
247 } else {
248 $ok = false;
249 }
250
251 if ( !$ok ) {
252 $cpIndex = null; // nothing saved
253 $bouncedPositions = $this->shutdownPositions;
254 // Raced out too many times or stash is down
255 $this->logger->warning( __METHOD__ . ": failed to save master pos for " .
256 implode( ', ', array_keys( $this->shutdownPositions ) ) . "\n"
257 );
258 } elseif ( $mode === 'sync' &&
259 $store->getQoS( $store::ATTR_SYNCWRITES ) < $store::QOS_SYNCWRITES_BE
260 ) {
261 // Positions may not be in all datacenters, force LBFactory to play it safe
262 $this->logger->info( __METHOD__ . ": store may not support synchronous writes." );
263 $bouncedPositions = $this->shutdownPositions;
264 } else {
265 $bouncedPositions = [];
266 }
267
268 return $bouncedPositions;
269 }
270
271 /**
272 * @param string $dbName DB master name (e.g. "db1052")
273 * @return float|bool UNIX timestamp when client last touched the DB; false if not on record
274 * @since 1.28
275 */
276 public function getTouched( $dbName ) {
277 return $this->store->get( $this->getTouchedKey( $this->store, $dbName ) );
278 }
279
280 /**
281 * @param BagOStuff $store
282 * @param string $dbName
283 * @return string
284 */
285 private function getTouchedKey( BagOStuff $store, $dbName ) {
286 return $store->makeGlobalKey( __CLASS__, 'mtime', $this->clientId, $dbName );
287 }
288
289 /**
290 * Load in previous master positions for the client
291 */
292 protected function getStartupMasterPositions() {
293 if ( $this->initialized ) {
294 return $this->startupPositions;
295 }
296
297 $this->initialized = true;
298 $this->logger->debug( __METHOD__ . ": client ID is {$this->clientId} (read)\n" );
299
300 if ( $this->wait ) {
301 // If there is an expectation to see master positions from a certain write
302 // index or higher, then block until it appears, or until a timeout is reached.
303 // Since the write index restarts each time the key is created, it is possible that
304 // a lagged store has a matching key write index. However, in that case, it should
305 // already be expired and thus treated as non-existing, maintaining correctness.
306 if ( $this->waitForPosIndex > 0 ) {
307 $data = null;
308 $indexReached = null; // highest index reached in the position store
309 $loop = new WaitConditionLoop(
310 function () use ( &$data, &$indexReached ) {
311 $data = $this->store->get( $this->key );
312 if ( !is_array( $data ) ) {
313 return WaitConditionLoop::CONDITION_CONTINUE; // not found yet
314 } elseif ( !isset( $data['writeIndex'] ) ) {
315 return WaitConditionLoop::CONDITION_REACHED; // b/c
316 }
317 $indexReached = max( $data['writeIndex'], $indexReached );
318
319 return ( $data['writeIndex'] >= $this->waitForPosIndex )
320 ? WaitConditionLoop::CONDITION_REACHED
321 : WaitConditionLoop::CONDITION_CONTINUE;
322 },
323 $this->waitForPosStoreTimeout
324 );
325 $result = $loop->invoke();
326 $waitedMs = $loop->getLastWaitTime() * 1e3;
327
328 if ( $result == $loop::CONDITION_REACHED ) {
329 $this->logger->debug(
330 __METHOD__ . ": expected and found position index.",
331 [
332 'cpPosIndex' => $this->waitForPosIndex,
333 'waitTimeMs' => $waitedMs
334 ] + $this->clientLogInfo
335 );
336 } else {
337 $this->logger->warning(
338 __METHOD__ . ": expected but failed to find position index.",
339 [
340 'cpPosIndex' => $this->waitForPosIndex,
341 'indexReached' => $indexReached,
342 'waitTimeMs' => $waitedMs
343 ] + $this->clientLogInfo
344 );
345 }
346 } else {
347 $data = $this->store->get( $this->key );
348 }
349
350 $this->startupPositions = $data ? $data['positions'] : [];
351 $this->logger->debug( __METHOD__ . ": key is {$this->key} (read)\n" );
352 } else {
353 $this->startupPositions = [];
354 $this->logger->debug( __METHOD__ . ": key is {$this->key} (unread)\n" );
355 }
356
357 return $this->startupPositions;
358 }
359
360 /**
361 * @param array|bool $curValue
362 * @param DBMasterPos[] $shutdownPositions
363 * @param int|null &$cpIndex
364 * @return array
365 */
366 protected function mergePositions( $curValue, array $shutdownPositions, &$cpIndex = null ) {
367 /** @var DBMasterPos[] $curPositions */
368 $curPositions = $curValue['positions'] ?? [];
369 // Use the newest positions for each DB master
370 foreach ( $shutdownPositions as $db => $pos ) {
371 if (
372 !isset( $curPositions[$db] ) ||
373 !( $curPositions[$db] instanceof DBMasterPos ) ||
374 $pos->asOfTime() > $curPositions[$db]->asOfTime()
375 ) {
376 $curPositions[$db] = $pos;
377 }
378 }
379
380 $cpIndex = $curValue['writeIndex'] ?? 0;
381
382 return [
383 'positions' => $curPositions,
384 'writeIndex' => ++$cpIndex
385 ];
386 }
387 }