In LockServerDaemon:
authorAaron Schulz <aaron@users.mediawiki.org>
Sat, 28 Jan 2012 20:54:19 +0000 (20:54 +0000)
committerAaron Schulz <aaron@users.mediawiki.org>
Sat, 28 Jan 2012 20:54:19 +0000 (20:54 +0000)
* r109802: fixed references to bogus vars that are now in LockHolder.
* Tweaked doCommand() so that if a client re-connects, it's removed from the deadSessions member.
* Made all socket operations non-blocking. Uses new SocketArray helper class.
* Bumped default 'maxLocks' parameter to 10000.

maintenance/locking/LockServerDaemon.php

index cba0454..629daf5 100644 (file)
@@ -1,5 +1,7 @@
 <?php
-
+/**
+ * This code should not require MediaWiki setup or PHP files.
+ */
 if ( php_sapi_name() !== 'cli' ) {
        die( "This is not a valid entry point.\n" );
 }
@@ -10,13 +12,12 @@ set_time_limit( 0 );
 LockServerDaemon::init(
        getopt( '', array(
                'address:', 'port:', 'authKey:',
-               'connTimeout::', 'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::',
+               'lockTimeout::', 'maxClients::', 'maxBacklog::', 'maxLocks::',
        ) )
 )->main();
 
 /**
- * Simple lock server daemon that accepts lock/unlock requests.
- * This should not require MediaWiki setup or PHP files.
+ * Simple lock server daemon that accepts lock/unlock requests
  */
 class LockServerDaemon {
        /** @var resource */
@@ -29,10 +30,9 @@ class LockServerDaemon {
        /** @var LockHolder */
        protected $lockHolder;
 
-       protected $address; // string (IP/hostname)
+       protected $address; // string IP address
        protected $port; // integer
        protected $authKey; // string key
-       protected $connTimeout; // array ( 'sec' => integer, 'usec' => integer )
        protected $lockTimeout; // integer number of seconds
        protected $maxBacklog; // integer
        protected $maxClients; // integer
@@ -40,6 +40,7 @@ class LockServerDaemon {
        protected $startTime; // integer UNIX timestamp
        protected $ticks = 0; // integer counter
 
+       /* @var LockServerDaemon */
        protected static $instance = null;
 
        /**
@@ -54,7 +55,7 @@ class LockServerDaemon {
                        if ( !isset( $config[$par] ) ) {
                                die( "Usage: php LockServerDaemon.php " .
                                        "--address <address> --port <port> --authkey <key> " .
-                                       "[--connTimeout <seconds>] [--lockTimeout <seconds>] " .
+                                       "[--lockTimeout <seconds>] " .
                                        "[--maxLocks <integer>] [--maxClients <integer>] [--maxBacklog <integer>]"
                                );
                        }
@@ -72,13 +73,6 @@ class LockServerDaemon {
                $this->port = $config['port'];
                $this->authKey = $config['authKey'];
                // Parameters with defaults...
-               $connTimeout = isset( $config['connTimeout'] )
-                       ? $config['connTimeout']
-                       : 1.5;
-               $this->connTimeout = array(
-                       'sec'  => floor( $connTimeout ),
-                       'usec' => floor( ( $connTimeout - floor( $connTimeout ) ) * 1e6 )
-               );
                $this->lockTimeout = isset( $config['lockTimeout'] )
                        ? (int)$config['lockTimeout']
                        : 60;
@@ -90,7 +84,7 @@ class LockServerDaemon {
                        : 100;
                $maxLocks = isset( $config['maxLocks'] )
                        ? (int)$config['maxLocks']
-                       : 5000;
+                       : 10000;
 
                $this->lockHolder = new LockHolder( $maxLocks );
        }
@@ -98,7 +92,7 @@ class LockServerDaemon {
        /**
         * @return void
         */
-       protected function setupSocket() {
+       protected function setupServerSocket() {
                if ( !function_exists( 'socket_create' ) ) {
                        throw new Exception( "PHP sockets extension missing from PHP CLI mode." );
                }
@@ -116,75 +110,69 @@ class LockServerDaemon {
                                socket_strerror( socket_last_error( $sock ) ) );
                }
                $this->sock = $sock;
-
                $this->startTime = time();
        }
 
        /**
-        * @return void
+        * Entry-point function that listens to the server socket, accepts
+        * new clients, and recieves/responds to requests to lock resources.
         */
        public function main() {
-               // Setup socket and start listing
-               $this->setupSocket();
-               // Create a list of all the clients that will be connected to us.
-               $clients = array( $this->sock ); // start off with listening socket
+               $this->setupServerSocket(); // setup listening socket
+               $socketArray = new SocketArray(); // sockets being serviced
+               $socketArray->addSocket( $this->sock ); // add listening socket
                do {
-                       // Create a copy, so $clients doesn't get modified by socket_select()
-                       $read = $clients; // clients-with-data (plus listening socket)
-                       // Get a list of all the clients that have data to be read from
-                       $changed = socket_select( $read, $write = NULL, $except = NULL, NULL );
-                       if ( $changed === false ) {
-                               trigger_error( 'socket_listen(): ' . socket_strerror( socket_last_error() ) );
-                               continue;
-                       } elseif ( $changed < 1 ) {
+                       list( $read, $write ) = $socketArray->socketsForSelect();
+                       if ( socket_select( $read, $write, $except = NULL, NULL ) < 1 ) {
                                continue; // wait
                        }
                        // Check if there is a client trying to connect...
-                       if ( in_array( $this->sock, $read ) && count( $clients ) < $this->maxClients ) {
-                               // Accept the new client...
-                               $newsock = socket_accept( $this->sock );
-                               if ( $newsock ) {
-                                       socket_set_option( $newsock, SOL_SOCKET, SO_KEEPALIVE, 1 );
-                                       socket_set_option( $newsock, SOL_SOCKET, SO_RCVTIMEO, $this->connTimeout );
-                                       socket_set_option( $newsock, SOL_SOCKET, SO_SNDTIMEO, $this->connTimeout );
-                                       $clients[] = $newsock;
-                                       // Remove the listening socket from the clients-with-data array...
-                                       $key = array_search( $this->sock, $read );
-                                       unset( $read[$key] );
+                       if ( in_array( $this->sock, $read ) && $socketArray->size() < $this->maxClients ) {
+                               $newSock = socket_accept( $this->sock );
+                               if ( $newSock ) {
+                                       socket_set_option( $newSock, SOL_SOCKET, SO_KEEPALIVE, 1 );
+                                       socket_set_nonblock( $newSock ); // don't block on read()/write()
+                                       $socketArray->addSocket( $newSock );
                                }
                        }
                        // Loop through all the clients that have data to read...
                        foreach ( $read as $read_sock ) {
-                               // Read until newline or 65535 bytes are recieved.
-                               // socket_read show errors when the client is disconnected.
-                               $data = @socket_read( $read_sock, 65535, PHP_NORMAL_READ );
+                               if ( $read_sock === $this->sock ) {
+                                       continue; // skip listening socket
+                               }
+                               // Avoids PHP_NORMAL_READ per https://bugs.php.net/bug.php?id=33471
+                               $data = socket_read( $read_sock, 65535 );
                                // Check if the client is disconnected
-                               if ( $data === false ) {
-                                       // Remove client from $clients list
-                                       $key = array_search( $read_sock, $clients );
-                                       unset( $clients[$key] );
-                                       // Remove socket's session from tracking (if it exists)
-                                       $session = array_search( $read_sock, $this->sessions );
-                                       if ( $session !== false ) {
-                                               unset( $this->sessions[$session] );
-                                               // Record recently killed sessions that still have locks
-                                               if ( isset( $this->sessionIndexSh[$session] )
-                                                       || isset( $this->sessionIndexEx[$session] ) )
-                                               {
-                                                       $this->deadSessions[$session] = time();
-                                               }
-                                       }
-                               } else {
+                               if ( $data === false || $data === '' ) {
+                                       $socketArray->closeSocket( $read_sock );
+                                       $this->recordDeadSocket( $read_sock ); // remove session
+                               // Check if we reached the end of a message
+                               } elseif ( substr( $data, -1 ) === "\n" ) {
+                                       // Newline is the last char (given ping-pong message usage)
+                                       $cmd = $socketArray->readRcvBuffer( $read_sock ) . $data;
                                        // Perform the requested command...
-                                       $response = $this->doCommand( trim( $data ), $read_sock );
+                                       $response = $this->doCommand( rtrim( $cmd ), $read_sock );
                                        // Send the response to the client...
-                                       if ( socket_write( $read_sock, "$response\n" ) === false ) {
-                                               trigger_error( 'socket_write(): ' .
-                                                       socket_strerror( socket_last_error( $read_sock ) ) );
-                                       }
+                                       $socketArray->appendSndBuffer( $read_sock, $response . "\n" );
+                               // Otherwise, we just have more message data to append
+                               } elseif ( !$socketArray->appendRcvBuffer( $read_sock, $data ) ) {
+                                       $socketArray->closeSocket( $read_sock ); // too big
+                                       $this->recordDeadSocket( $read_sock ); // remove session
                                }
                        }
-                       // Prune dead locks every 10 socket events...
+                       // Loop through all the clients that have data to write...
+                       foreach ( $write as $write_sock ) {
+                               $bytes = socket_write( $write_sock, $socketArray->readSndBuffer( $write_sock ) );
+                               // Check if the client is disconnected
+                               if ( $bytes === false ) {
+                                       $socketArray->closeSocket( $write_sock );
+                                       $this->recordDeadSocket( $write_sock ); // remove session
+                               // Otherwise, truncate these bytes from the start of the write buffer
+                               } else {
+                                       $socketArray->consumeSndBuffer( $write_sock, $bytes );
+                               }
+                       }
+                       // Prune dead locks every few socket events...
                        if ( ++$this->ticks >= 9 ) {
                                $this->ticks = 0;
                                $this->purgeExpiredLocks();
@@ -206,6 +194,7 @@ class LockServerDaemon {
                // On first command, track the session => sock correspondence
                if ( !isset( $this->sessions[$session] ) ) {
                        $this->sessions[$session] = $sourceSock;
+                       unset( $this->deadSessions[$session] ); // renew if dead
                }
                if ( $function === 'ACQUIRE' ) {
                        return $this->lockHolder->lock( $session, $type, $resources );
@@ -259,14 +248,34 @@ class LockServerDaemon {
                return 'BAD_FORMAT';
        }
 
+       /**
+        * Remove a socket's corresponding session from tracking and
+        * store it in the dead session tracking if it still has locks.
+        * 
+        * @param $socket resource
+        * @return book
+        */
+       protected function recordDeadSocket( $socket ) {
+               $session = array_search( $socket, $this->sessions );
+               if ( $session !== false ) {
+                       unset( $this->sessions[$session] );
+                       // Record recently killed sessions that still have locks
+                       if ( $this->lockHolder->sessionHasLocks( $session ) ) {
+                               $this->deadSessions[$session] = time();
+                       }
+                       return true;
+               }
+               return false;
+       }
+
        /**
         * Clear locks for sessions that have been dead for a while
         *
         * @return integer Number of sessions purged
         */
        protected function purgeExpiredLocks() {
-               $now = time();
                $count = 0;
+               $now = time();
                foreach ( $this->deadSessions as $session => $timestamp ) {
                        if ( ( $now - $timestamp ) > $this->lockTimeout ) {
                                $this->lockHolder->release( $session );
@@ -288,8 +297,143 @@ class LockServerDaemon {
 }
 
 /**
- * LockServerDaemon helper class that keeps track of the locks.
- * This should not require MediaWiki setup or PHP files.
+ * LockServerDaemon helper class that keeps track socket states
+ */
+class SocketArray {
+       /* @var Array */
+       protected $clients = array(); // array of client sockets
+       /* @var Array */
+       protected $rBuffers = array(); // corresponding socket read buffers
+       /* @var Array */
+       protected $wBuffers = array(); // corresponding socket write buffers
+
+       const BUFFER_SIZE = 65535;
+
+       /**
+        * @return Array (list of sockets to read, list of sockets to write)
+        */
+       public function socketsForSelect() {
+               $rSockets = array();
+               $wSockets = array();
+               foreach ( $this->clients as $key => $socket ) {
+                       if ( $this->wBuffers[$key] !== '' ) {
+                               $wSockets[] = $socket; // wait for writing to unblock
+                       } else {
+                               $rSockets[] = $socket; // wait for reading to unblock
+                       }
+               }
+               return array( $rSockets, $wSockets );
+       }
+
+       /**
+        * @return integer Number of client sockets
+        */
+       public function size() {
+               return count( $this->clients );
+       }
+
+       /**
+        * @param $sock resource
+        * @return bool
+        */
+       public function addSocket( $sock ) {
+               $this->clients[] = $sock;
+               $this->rBuffers[] = '';
+               $this->wBuffers[] = '';
+               return true;
+       }
+
+       /**
+        * @param $sock resource
+        * @return bool
+        */
+       public function closeSocket( $sock ) {
+               $key = array_search( $sock, $this->clients );
+               if ( $key === false ) {
+                       return false;
+               }
+               socket_close( $sock );
+               unset( $this->clients[$key] );
+               unset( $this->rBuffers[$key] );
+               unset( $this->wBuffers[$key] );
+               return true;
+       }
+
+       /**
+        * @param $sock resource
+        * @param $data string
+        * @return bool
+        */
+       public function appendRcvBuffer( $sock, $data ) {
+               $key = array_search( $sock, $this->clients );
+               if ( $key === false ) {
+                       return false;
+               } elseif ( ( strlen( $this->rBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
+                       return false;
+               }
+               $this->rBuffers[$key] .= $data;
+               return true;
+       }
+
+       /**
+        * @param $sock resource
+        * @return string|false
+        */
+       public function readRcvBuffer( $sock ) {
+               $key = array_search( $sock, $this->clients );
+               if ( $key === false ) {
+                       return false;
+               }
+               $data = $this->rBuffers[$key];
+               $this->rBuffers[$key] = ''; // consume data
+               return $data;
+       }
+
+       /**
+        * @param $sock resource
+        * @param $data string
+        * @return bool
+        */
+       public function appendSndBuffer( $sock, $data ) {
+               $key = array_search( $sock, $this->clients );
+               if ( $key === false ) {
+                       return false;
+               } elseif ( ( strlen( $this->wBuffers[$key] ) + strlen( $data ) ) > self::BUFFER_SIZE ) {
+                       return false;
+               }
+               $this->wBuffers[$key] .= $data;
+               return true;
+       }
+
+       /**
+        * @param $sock resource
+        * @return bool
+        */
+       public function readSndBuffer( $sock ) {
+               $key = array_search( $sock, $this->clients );
+               if ( $key === false ) {
+                       return false;
+               }
+               return $this->wBuffers[$key];
+       }
+
+       /**
+        * @param $sock resource
+        * @param $bytes integer
+        * @return bool
+        */
+       public function consumeSndBuffer( $sock, $bytes ) {
+               $key = array_search( $sock, $this->clients );
+               if ( $key === false ) {
+                       return false;
+               }
+               $this->wBuffers[$key] = (string)substr( $this->wBuffers[$key], $bytes );
+               return true;
+       }
+}
+
+/**
+ * LockServerDaemon helper class that keeps track of the locks
  */
 class LockHolder {
        /** @var Array */
@@ -312,6 +456,15 @@ class LockHolder {
                $this->maxLocks = $maxLocks;
        }
 
+       /**
+        * @param $session string
+        * @return bool 
+        */
+       public function sessionHasLocks( $session ) {
+               return isset( $this->sessionIndexSh[$session] ) 
+                       || isset( $this->sessionIndexEx[$session] );
+       }
+
        /**
         * @param $session string
         * @param $type string