Added ORMIterator interface which can be used for type hinting (in particular when...
[lhc/web/wiklou.git] / maintenance / locking / LockServerDaemon.php
1 <?php
2 /**
3 * @file
4 * @ingroup LockManager Maintenance
5 */
6
7 /**
8 * This code should not require MediaWiki setup or PHP files.
9 */
10 if ( php_sapi_name() !== 'cli' ) {
11 die( "This is not a valid entry point.\n" );
12 }
13 error_reporting( E_ALL );
14
15 // Run the server...
16 set_time_limit( 0 );
17 LockServerDaemon::init(
18 getopt( '', array(
19 'address:', 'port:', 'authKey:',
20 'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::',
21 ) )
22 )->main();
23
24 /**
25 * Simple lock server daemon that accepts lock/unlock requests
26 */
27 class LockServerDaemon {
28 /** @var resource */
29 protected $sock; // socket to listen/accept on
30 /** @var Array */
31 protected $sessions = array(); // (session => resource)
32 /** @var Array */
33 protected $deadSessions = array(); // (session => UNIX timestamp)
34
35 /** @var LockHolder */
36 protected $lockHolder;
37
38 protected $address; // string IP address
39 protected $port; // integer
40 protected $authKey; // string key
41 protected $lockTimeout; // integer number of seconds
42 protected $maxBacklog; // integer
43 protected $maxClients; // integer
44
45 protected $startTime; // integer UNIX timestamp
46 protected $ticks = 0; // integer counter
47
48 /* @var LockServerDaemon */
49 protected static $instance = null;
50
51 /**
52 * @params $config Array
53 * @return LockServerDaemon
54 */
55 public static function init( array $config ) {
56 if ( self::$instance ) {
57 throw new Exception( 'LockServer already initialized.' );
58 }
59 foreach ( array( 'address', 'port', 'authKey' ) as $par ) {
60 if ( !isset( $config[$par] ) ) {
61 die( "Usage: php LockServerDaemon.php " .
62 "--address <address> --port <port> --authkey <key> " .
63 "[--lockTimeout <seconds>] " .
64 "[--maxLocks <integer>] [--maxClients <integer>] [--maxBacklog <integer>]"
65 );
66 }
67 }
68 self::$instance = new self( $config );
69 return self::$instance;
70 }
71
72 /**
73 * @params $config Array
74 */
75 protected function __construct( array $config ) {
76 // Required parameters...
77 $this->address = $config['address'];
78 $this->port = $config['port'];
79 $this->authKey = $config['authKey'];
80 // Parameters with defaults...
81 $this->lockTimeout = isset( $config['lockTimeout'] )
82 ? (int)$config['lockTimeout']
83 : 60;
84 $this->maxClients = isset( $config['maxClients'] )
85 ? (int)$config['maxClients']
86 : 1000; // less than default FD_SETSIZE
87 $this->maxBacklog = isset( $config['maxBacklog'] )
88 ? (int)$config['maxBacklog']
89 : 100;
90 $maxLocks = isset( $config['maxLocks'] )
91 ? (int)$config['maxLocks']
92 : 10000;
93
94 $this->lockHolder = new LockHolder( $maxLocks );
95 }
96
97 /**
98 * @return void
99 */
100 protected function setupServerSocket() {
101 if ( !function_exists( 'socket_create' ) ) {
102 throw new Exception( "PHP sockets extension missing from PHP CLI mode." );
103 }
104 $sock = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
105 if ( $sock === false ) {
106 throw new Exception( "socket_create(): " . socket_strerror( socket_last_error() ) );
107 }
108 socket_set_option( $sock, SOL_SOCKET, SO_REUSEADDR, 1 ); // bypass 2MLS
109 socket_set_nonblock( $sock ); // don't block on accept()
110 if ( socket_bind( $sock, $this->address, $this->port ) === false ) {
111 throw new Exception( "socket_bind(): " .
112 socket_strerror( socket_last_error( $sock ) ) );
113 } elseif ( socket_listen( $sock, $this->maxBacklog ) === false ) {
114 throw new Exception( "socket_listen(): " .
115 socket_strerror( socket_last_error( $sock ) ) );
116 }
117 $this->sock = $sock;
118 $this->startTime = time();
119 }
120
121 /**
122 * Entry-point function that listens to the server socket, accepts
123 * new clients, and recieves/responds to requests to lock resources.
124 */
125 public function main() {
126 $this->setupServerSocket(); // setup listening socket
127 $socketArray = new SocketArray(); // sockets being serviced
128 $socketArray->addSocket( $this->sock ); // add listening socket
129 do {
130 list( $read, $write ) = $socketArray->socketsForSelect();
131 if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) {
132 continue; // wait
133 }
134 // Check if there is a client trying to connect...
135 if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) {
136 $newSock = socket_accept( $this->sock );
137 if ( $newSock ) {
138 socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 );
139 socket_set_nonblock( $newSock ); // don't block on read()/write()
140 $socketArray->addSocket( $newSock );
141 }
142 }
143 // Loop through all the clients that have data to read...
144 foreach ( $read as $read_sock ) {
145 if ( $read_sock === $this->sock ) {
146 continue; // skip listening socket
147 }
148 // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471
149 $data = socket_read( $read_sock, 65535 );
150 // Check if the client is disconnected
151 if ( $data === false || $data === '' ) {
152 $socketArray->closeSocket( $read_sock );
153 $this->recordDeadSocket( $read_sock ); // remove session
154 // Check if we reached the end of a message
155 } elseif ( substr( $data, -1 ) === "\n" ) {
156 // Newline is the last char (given ping-pong message usage)
157 $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data;
158 // Perform the requested command...
159 $response = $this->doCommand( rtrim( $cmd ), $read_sock );
160 // Send the response to the client...
161 $socketArray->appendSndBuffer( $read_sock, $response . "\n" );
162 // Otherwise, we just have more message data to append
163 } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) {
164 $socketArray->closeSocket( $read_sock ); // too big
165 $this->recordDeadSocket( $read_sock ); // remove session
166 }
167 }
168 // Loop through all the clients that have data to write...
169 foreach ( $write as $write_sock ) {
170 $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) );
171 // Check if the client is disconnected
172 if ( $bytes === false ) {
173 $socketArray->closeSocket( $write_sock );
174 $this->recordDeadSocket( $write_sock ); // remove session
175 // Otherwise, truncate these bytes from the start of the write buffer
176 } else {
177 $socketArray->consumeSndBuffer( $write_sock, $bytes );
178 }
179 }
180 // Prune dead locks every few socket events...
181 if ( ++$this->ticks >= 9 ) {
182 $this->ticks = 0;
183 $this->purgeExpiredLocks();
184 }
185 } while ( true );
186 }
187
188 /**
189 * @param $data string
190 * @param $sourceSock resource
191 * @return string
192 */
193 protected function doCommand( $data, $sourceSock ) {
194 $cmdArr = $this->getCommand( $data );
195 if ( is_string( $cmdArr ) ) {
196 return $cmdArr; // error
197 }
198 list( $function, $session, $type, $resources ) = $cmdArr;
199 // On first command, track the session => sock correspondence
200 if ( !isset( $this->sessions[$session] ) ) {
201 $this->sessions[$session] = $sourceSock;
202 unset( $this->deadSessions[$session] ); // renew if dead
203 }
204 if ( $function === 'ACQUIRE' ) {
205 return $this->lockHolder->lock( $session, $type, $resources );
206 } elseif ( $function === 'RELEASE' ) {
207 return $this->lockHolder->unlock( $session, $type, $resources );
208 } elseif ( $function === 'RELEASE_ALL' ) {
209 return $this->lockHolder->release( $session );
210 } elseif ( $function === 'STAT' ) {
211 return $this->stat();
212 }
213 return 'INTERNAL_ERROR';
214 }
215
216 /**
217 * @param $data string
218 * @return Array
219 */
220 protected function getCommand( $data ) {
221 $m = explode( ':', $data ); // <session, key, command, type, values>
222 if ( count( $m ) == 5 ) {
223 list( $session, $key, $command, $type, $values ) = $m;
224 if ( sha1( $session . $command . $type . $values . $this->authKey ) !== $key ) {
225 return 'BAD_KEY';
226 } elseif ( strlen( $session ) !== 31 ) {
227 return 'BAD_SESSION';
228 }
229 $values = explode( '|', $values );
230 if ( $command === 'ACQUIRE' ) {
231 $needsLockArgs = true;
232 } elseif ( $command === 'RELEASE' ) {
233 $needsLockArgs = true;
234 } elseif ( $command === 'RELEASE_ALL' ) {
235 $needsLockArgs = false;
236 } elseif ( $command === 'STAT' ) {
237 $needsLockArgs = false;
238 } else {
239 return 'BAD_COMMAND';
240 }
241 if ( $needsLockArgs ) {
242 if ( $type !== 'SH' && $type !== 'EX' ) {
243 return 'BAD_TYPE';
244 }
245 foreach ( $values as $value ) {
246 if ( strlen( $value ) !== 31 ) {
247 return 'BAD_FORMAT';
248 }
249 }
250 }
251 return array( $command, $session, $type, $values );
252 }
253 return 'BAD_FORMAT';
254 }
255
256 /**
257 * Remove a socket's corresponding session from tracking and
258 * store it in the dead session tracking if it still has locks.
259 *
260 * @param $socket resource
261 * @return bool
262 */
263 protected function recordDeadSocket( $socket ) {
264 $session = array_search( $socket, $this->sessions );
265 if ( $session !== false ) {
266 unset( $this->sessions[$session] );
267 // Record recently killed sessions that still have locks
268 if ( $this->lockHolder->sessionHasLocks( $session ) ) {
269 $this->deadSessions[$session] = time();
270 }
271 return true;
272 }
273 return false;
274 }
275
276 /**
277 * Clear locks for sessions that have been dead for a while
278 *
279 * @return integer Number of sessions purged
280 */
281 protected function purgeExpiredLocks() {
282 $count = 0;
283 $now = time();
284 foreach ( $this->deadSessions as $session => $timestamp ) {
285 if ( ( $now - $timestamp ) > $this->lockTimeout ) {
286 $this->lockHolder->release( $session );
287 unset( $this->deadSessions[$session] );
288 ++$count;
289 }
290 }
291 return $count;
292 }
293
294 /**
295 * Get the current timestamp and memory usage
296 *
297 * @return string
298 */
299 protected function stat() {
300 return ( time() - $this->startTime ) . ':' . memory_get_usage();
301 }
302 }
303
304 /**
305 * LockServerDaemon helper class that keeps track socket states
306 */
307 class SocketArray {
308 /* @var Array */
309 protected $clients = array(); // array of client sockets
310 /* @var Array */
311 protected $rBuffers = array(); // corresponding socket read buffers
312 /* @var Array */
313 protected $wBuffers = array(); // corresponding socket write buffers
314
315 const BUFFER_SIZE = 65535;
316
317 /**
318 * @return Array (list of sockets to read, list of sockets to write)
319 */
320 public function socketsForSelect() {
321 $rSockets = array();
322 $wSockets = array();
323 foreach ( $this->clients as $key => $socket ) {
324 if ( $this->wBuffers[$key] !== '' ) {
325 $wSockets[] = $socket; // wait for writing to unblock
326 } else {
327 $rSockets[] = $socket; // wait for reading to unblock
328 }
329 }
330 return array( $rSockets, $wSockets );
331 }
332
333 /**
334 * @return integer Number of client sockets
335 */
336 public function size() {
337 return count( $this->clients );
338 }
339
340 /**
341 * @param $sock resource
342 * @return bool
343 */
344 public function addSocket( $sock ) {
345 $this->clients[] = $sock;
346 $this->rBuffers[] = '';
347 $this->wBuffers[] = '';
348 return true;
349 }
350
351 /**
352 * @param $sock resource
353 * @return bool
354 */
355 public function closeSocket( $sock ) {
356 $key = array_search( $sock, $this->clients );
357 if ( $key === false ) {
358 return false;
359 }
360 socket_close( $sock );
361 unset( $this->clients[$key] );
362 unset( $this->rBuffers[$key] );
363 unset( $this->wBuffers[$key] );
364 return true;
365 }
366
367 /**
368 * @param $sock resource
369 * @param $data string
370 * @return bool
371 */
372 public function appendRcvBuffer( $sock, $data ) {
373 $key = array_search( $sock, $this->clients );
374 if ( $key === false ) {
375 return false;
376 } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
377 return false;
378 }
379 $this->rBuffers[$key] .= $data;
380 return true;
381 }
382
383 /**
384 * @param $sock resource
385 * @return string|bool
386 */
387 public function readRcvBuffer( $sock ) {
388 $key = array_search( $sock, $this->clients );
389 if ( $key === false ) {
390 return false;
391 }
392 $data = $this->rBuffers[$key];
393 $this->rBuffers[$key] = ''; // consume data
394 return $data;
395 }
396
397 /**
398 * @param $sock resource
399 * @param $data string
400 * @return bool
401 */
402 public function appendSndBuffer( $sock, $data ) {
403 $key = array_search( $sock, $this->clients );
404 if ( $key === false ) {
405 return false;
406 } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
407 return false;
408 }
409 $this->wBuffers[$key] .= $data;
410 return true;
411 }
412
413 /**
414 * @param $sock resource
415 * @return bool
416 */
417 public function readSndBuffer( $sock ) {
418 $key = array_search( $sock, $this->clients );
419 if ( $key === false ) {
420 return false;
421 }
422 return $this->wBuffers[$key];
423 }
424
425 /**
426 * @param $sock resource
427 * @param $bytes integer
428 * @return bool
429 */
430 public function consumeSndBuffer( $sock, $bytes ) {
431 $key = array_search( $sock, $this->clients );
432 if ( $key === false ) {
433 return false;
434 }
435 $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes );
436 return true;
437 }
438 }
439
440 /**
441 * LockServerDaemon helper class that keeps track of the locks
442 */
443 class LockHolder {
444 /** @var Array */
445 protected $shLocks = array(); // (key => session => 1)
446 /** @var Array */
447 protected $exLocks = array(); // (key => session)
448
449 /** @var Array */
450 protected $sessionIndexSh = array(); // (session => key => 1)
451 /** @var Array */
452 protected $sessionIndexEx = array(); // (session => key => 1)
453 protected $lockCount = 0; // integer
454
455 protected $maxLocks; // integer
456
457 /**
458 * @params $maxLocks integer Maximum number of locks to allow
459 */
460 public function __construct( $maxLocks ) {
461 $this->maxLocks = $maxLocks;
462 }
463
464 /**
465 * @param $session string
466 * @return bool
467 */
468 public function sessionHasLocks( $session ) {
469 return isset( $this->sessionIndexSh[$session] )
470 || isset( $this->sessionIndexEx[$session] );
471 }
472
473 /**
474 * @param $session string
475 * @param $type string
476 * @param $keys Array
477 * @return string
478 */
479 public function lock( $session, $type, array $keys ) {
480 if ( ( $this->lockCount + count( $keys ) ) > $this->maxLocks ) {
481 return 'TOO_MANY_LOCKS';
482 }
483 if ( $type === 'SH' ) {
484 // Check if any keys are already write-locked...
485 foreach ( $keys as $key ) {
486 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) {
487 return 'CANT_ACQUIRE';
488 }
489 }
490 // Acquire the read-locks...
491 foreach ( $keys as $key ) {
492 $this->set_sh_lock( $key, $session );
493 }
494 return 'ACQUIRED';
495 } elseif ( $type === 'EX' ) {
496 // Check if any keys are already read-locked or write-locked...
497 foreach ( $keys as $key ) {
498 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] !== $session ) {
499 return 'CANT_ACQUIRE';
500 }
501 if ( isset( $this->shLocks[$key] ) ) {
502 foreach ( $this->shLocks[$key] as $otherSession => $x ) {
503 if ( $otherSession !== $session ) {
504 return 'CANT_ACQUIRE';
505 }
506 }
507 }
508 }
509 // Acquire the write-locks...
510 foreach ( $keys as $key ) {
511 $this->set_ex_lock( $key, $session );
512 }
513 return 'ACQUIRED';
514 }
515 return 'INTERNAL_ERROR';
516 }
517
518 /**
519 * @param $session string
520 * @param $type string
521 * @param $keys Array
522 * @return string
523 */
524 public function unlock( $session, $type, array $keys ) {
525 if ( $type === 'SH' ) {
526 foreach ( $keys as $key ) {
527 $this->unset_sh_lock( $key, $session );
528 }
529 return 'RELEASED';
530 } elseif ( $type === 'EX' ) {
531 foreach ( $keys as $key ) {
532 $this->unset_ex_lock( $key, $session );
533 }
534 return 'RELEASED';
535 }
536 return 'INTERNAL_ERROR';
537 }
538
539 /**
540 * @param $session string
541 * @return string
542 */
543 public function release( $session ) {
544 if ( isset( $this->sessionIndexSh[$session] ) ) {
545 foreach ( $this->sessionIndexSh[$session] as $key => $x ) {
546 $this->unset_sh_lock( $key, $session );
547 }
548 }
549 if ( isset( $this->sessionIndexEx[$session] ) ) {
550 foreach ( $this->sessionIndexEx[$session] as $key => $x ) {
551 $this->unset_ex_lock( $key, $session );
552 }
553 }
554 return 'RELEASED_ALL';
555 }
556
557 /**
558 * @param $key string
559 * @param $session string
560 * @return void
561 */
562 protected function set_sh_lock( $key, $session ) {
563 if ( !isset( $this->shLocks[$key][$session] ) ) {
564 $this->shLocks[$key][$session] = 1;
565 $this->sessionIndexSh[$session][$key] = 1;
566 ++$this->lockCount; // we are adding a lock
567 }
568 }
569
570 /**
571 * @param $key string
572 * @param $session string
573 * @return void
574 */
575 protected function set_ex_lock( $key, $session ) {
576 if ( !isset( $this->exLocks[$key][$session] ) ) {
577 $this->exLocks[$key] = $session;
578 $this->sessionIndexEx[$session][$key] = 1;
579 ++$this->lockCount; // we are adding a lock
580 }
581 }
582
583 /**
584 * @param $key string
585 * @param $session string
586 * @return void
587 */
588 protected function unset_sh_lock( $key, $session ) {
589 if ( isset( $this->shLocks[$key][$session] ) ) {
590 unset( $this->shLocks[$key][$session] );
591 if ( !count( $this->shLocks[$key] ) ) {
592 unset( $this->shLocks[$key] );
593 }
594 unset( $this->sessionIndexSh[$session][$key] );
595 if ( !count( $this->sessionIndexSh[$session] ) ) {
596 unset( $this->sessionIndexSh[$session] );
597 }
598 --$this->lockCount;
599 }
600 }
601
602 /**
603 * @param $key string
604 * @param $session string
605 * @return void
606 */
607 protected function unset_ex_lock( $key, $session ) {
608 if ( isset( $this->exLocks[$key] ) && $this->exLocks[$key] === $session ) {
609 unset( $this->exLocks[$key] );
610 unset( $this->sessionIndexEx[$session][$key] );
611 if ( !count( $this->sessionIndexEx[$session] ) ) {
612 unset( $this->sessionIndexEx[$session] );
613 }
614 --$this->lockCount;
615 }
616 }
617 }