Merge "Type hint against LinkTarget in WatchedItemStore"
[lhc/web/wiklou.git] / includes / libs / objectcache / MemcachedPeclBagOStuff.php
index 43cebd3..cc7ee2a 100644 (file)
  */
 class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
        /** @var Memcached */
-       protected $client;
+       protected $syncClient;
+       /** @var Memcached|null */
+       protected $asyncClient;
+
+       /** @var bool Whether the non-buffering client is locked from use */
+       protected $syncClientIsBuffering = false;
+       /** @var bool Whether the non-buffering client should be flushed before use */
+       protected $hasUnflushedChanges = false;
+
+       /** @var array Memcached options */
+       private static $OPTS_SYNC_WRITES = [
+               Memcached::OPT_NO_BLOCK => false, // async I/O (using TCP buffers)
+               Memcached::OPT_BUFFER_WRITES => false // libmemcached buffers
+       ];
+       /** @var array Memcached options */
+       private static $OPTS_ASYNC_WRITES = [
+               Memcached::OPT_NO_BLOCK => true, // async I/O (using TCP buffers)
+               Memcached::OPT_BUFFER_WRITES => true // libmemcached buffers
+       ];
 
        /**
         * Available parameters are:
-        *   - servers:             The list of IP:port combinations holding the memcached servers.
-        *   - persistent:          Whether to use a persistent connection
-        *   - compress_threshold:  The minimum size an object must be before it is compressed
-        *   - timeout:             The read timeout in microseconds
-        *   - connect_timeout:     The connect timeout in seconds
-        *   - retry_timeout:       Time in seconds to wait before retrying a failed connect attempt
-        *   - server_failure_limit:  Limit for server connect failures before it is removed
-        *   - serializer:          May be either "php" or "igbinary". Igbinary produces more compact
-        *                          values, but serialization is much slower unless the php.ini option
-        *                          igbinary.compact_strings is off.
-        *   - use_binary_protocol  Whether to enable the binary protocol (default is ASCII) (boolean)
+        *   - servers:              List of IP:port combinations holding the memcached servers.
+        *   - persistent:           Whether to use a persistent connection
+        *   - compress_threshold:   The minimum size an object must be before it is compressed
+        *   - timeout:              The read timeout in microseconds
+        *   - connect_timeout:      The connect timeout in seconds
+        *   - retry_timeout:        Time in seconds to wait before retrying a failed connect attempt
+        *   - server_failure_limit: Limit for server connect failures before it is removed
+        *   - serializer:           Either "php" or "igbinary". Igbinary produces more compact
+        *                           values, but serialization is much slower unless the php.ini
+        *                           option igbinary.compact_strings is off.
+        *   - use_binary_protocol   Whether to enable the binary protocol (default is ASCII)
+        *   - allow_tcp_nagle_delay Whether to permit Nagle's algorithm for reducing packet count
         * @param array $params
-        * @throws InvalidArgumentException
         */
        function __construct( $params ) {
                parent::__construct( $params );
-               $params = $this->applyDefaultParams( $params );
+
+               // Default class-specific parameters
+               $params += [
+                       'compress_threshold' => 1500,
+                       'connect_timeout' => 0.5,
+                       'serializer' => 'php',
+                       'use_binary_protocol' => false,
+                       'allow_tcp_nagle_delay' => true
+               ];
 
                if ( $params['persistent'] ) {
                        // The pool ID must be unique to the server/option combination.
                        // The Memcached object is essentially shared for each pool ID.
                        // We can only reuse a pool ID if we keep the config consistent.
-                       $this->client = new Memcached( md5( serialize( $params ) ) );
-                       if ( count( $this->client->getServerList() ) ) {
-                               $this->logger->debug( __METHOD__ . ": persistent Memcached object already loaded." );
-                               return; // already initialized; don't add duplicate servers
-                       }
+                       $connectionPoolId = md5( serialize( $params ) );
+                       $syncClient = new Memcached( "$connectionPoolId-sync" );
+                       // Avoid clobbering the main thread-shared Memcached instance
+                       $asyncClient = new Memcached( "$connectionPoolId-async" );
                } else {
-                       $this->client = new Memcached;
-               }
-
-               if ( $params['use_binary_protocol'] ) {
-                       $this->client->setOption( Memcached::OPT_BINARY_PROTOCOL, true );
-               }
-
-               if ( isset( $params['retry_timeout'] ) ) {
-                       $this->client->setOption( Memcached::OPT_RETRY_TIMEOUT, $params['retry_timeout'] );
+                       $syncClient = new Memcached();
+                       $asyncClient = null;
                }
 
-               if ( isset( $params['server_failure_limit'] ) ) {
-                       $this->client->setOption( Memcached::OPT_SERVER_FAILURE_LIMIT, $params['server_failure_limit'] );
+               $this->initializeClient( $syncClient, $params, self::$OPTS_SYNC_WRITES );
+               if ( $asyncClient ) {
+                       $this->initializeClient( $asyncClient, $params, self::$OPTS_ASYNC_WRITES );
                }
 
+               // Set the main client and any dedicated one for buffered writes
+               $this->syncClient = $syncClient;
+               $this->asyncClient = $asyncClient;
                // The compression threshold is an undocumented php.ini option for some
                // reason. There's probably not much harm in setting it globally, for
                // compatibility with the settings for the PHP client.
                ini_set( 'memcached.compression_threshold', $params['compress_threshold'] );
+       }
 
-               // Set timeouts
-               $this->client->setOption( Memcached::OPT_CONNECT_TIMEOUT, $params['connect_timeout'] * 1000 );
-               $this->client->setOption( Memcached::OPT_SEND_TIMEOUT, $params['timeout'] );
-               $this->client->setOption( Memcached::OPT_RECV_TIMEOUT, $params['timeout'] );
-               $this->client->setOption( Memcached::OPT_POLL_TIMEOUT, $params['timeout'] / 1000 );
-
-               // Set libketama mode since it's recommended by the documentation and
-               // is as good as any. There's no way to configure libmemcached to use
-               // hashes identical to the ones currently in use by the PHP client, and
-               // even implementing one of the libmemcached hashes in pure PHP for
-               // forwards compatibility would require MemcachedClient::get_sock() to be
-               // rewritten.
-               $this->client->setOption( Memcached::OPT_LIBKETAMA_COMPATIBLE, true );
-
-               // Set the serializer
-               $ok = false;
+       /**
+        * Initialize the client only if needed and reuse it otherwise.
+        * This avoids duplicate servers in the list and new connections.
+        *
+        * @param Memcached $client
+        * @param array $params
+        * @param array $options Base options for Memcached::setOptions()
+        * @throws RuntimeException
+        */
+       private function initializeClient( Memcached $client, array $params, array $options ) {
+               if ( $client->getServerList() ) {
+                       $this->logger->debug( __METHOD__ . ": pre-initialized client instance." );
+
+                       return; // preserve persistent handle
+               }
+
+               $this->logger->debug( __METHOD__ . ": initializing new client instance." );
+
+               $options += [
+                       Memcached::OPT_NO_BLOCK => false,
+                       Memcached::OPT_BUFFER_WRITES => false,
+                       // Network protocol (ASCII or binary)
+                       Memcached::OPT_BINARY_PROTOCOL => $params['use_binary_protocol'],
+                       // Set various network timeouts
+                       Memcached::OPT_CONNECT_TIMEOUT => $params['connect_timeout'] * 1000,
+                       Memcached::OPT_SEND_TIMEOUT => $params['timeout'],
+                       Memcached::OPT_RECV_TIMEOUT => $params['timeout'],
+                       Memcached::OPT_POLL_TIMEOUT => $params['timeout'] / 1000,
+                       // Avoid pointless delay when sending/fetching large blobs
+                       Memcached::OPT_TCP_NODELAY => !$params['allow_tcp_nagle_delay'],
+                       // Set libketama mode since it's recommended by the documentation
+                       Memcached::OPT_LIBKETAMA_COMPATIBLE => true
+               ];
+               if ( isset( $params['retry_timeout'] ) ) {
+                       $options[Memcached::OPT_RETRY_TIMEOUT] = $params['retry_timeout'];
+               }
+               if ( isset( $params['server_failure_limit'] ) ) {
+                       $options[Memcached::OPT_SERVER_FAILURE_LIMIT] = $params['server_failure_limit'];
+               }
                if ( $params['serializer'] === 'php' ) {
-                       $ok = $this->client->setOption( Memcached::OPT_SERIALIZER, Memcached::SERIALIZER_PHP );
+                       $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_PHP;
                } elseif ( $params['serializer'] === 'igbinary' ) {
                        if ( !Memcached::HAVE_IGBINARY ) {
-                               throw new InvalidArgumentException(
+                               throw new RuntimeException(
                                        __CLASS__ . ': the igbinary extension is not available ' .
                                        'but igbinary serialization was requested.'
                                );
                        }
-                       $ok = $this->client->setOption( Memcached::OPT_SERIALIZER, Memcached::SERIALIZER_IGBINARY );
+                       $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_IGBINARY;
                }
-               if ( !$ok ) {
-                       throw new InvalidArgumentException( __CLASS__ . ': invalid serializer parameter' );
+
+               if ( !$client->setOptions( $options ) ) {
+                       throw new RuntimeException(
+                               "Invalid options: " . json_encode( $options, JSON_PRETTY_PRINT )
+                       );
                }
 
                $servers = [];
@@ -121,28 +170,20 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
                                $servers[] = [ $host, false ]; // (ip or path, port)
                        }
                }
-               $this->client->addServers( $servers );
-       }
-
-       protected function applyDefaultParams( $params ) {
-               $params = parent::applyDefaultParams( $params );
 
-               if ( !isset( $params['use_binary_protocol'] ) ) {
-                       $params['use_binary_protocol'] = false;
+               if ( !$client->addServers( $servers ) ) {
+                       throw new RuntimeException( "Failed to inject server address list" );
                }
-
-               if ( !isset( $params['serializer'] ) ) {
-                       $params['serializer'] = 'php';
-               }
-
-               return $params;
        }
 
        protected function doGet( $key, $flags = 0, &$casToken = null ) {
                $this->debug( "get($key)" );
+
+               $client = $this->acquireSyncClient();
                if ( defined( Memcached::class . '::GET_EXTENDED' ) ) { // v3.0.0
+                       /** @noinspection PhpUndefinedClassConstantInspection */
                        $flags = Memcached::GET_EXTENDED;
-                       $res = $this->client->get( $this->validateKeyEncoding( $key ), null, $flags );
+                       $res = $client->get( $this->validateKeyEncoding( $key ), null, $flags );
                        if ( is_array( $res ) ) {
                                $result = $res['value'];
                                $casToken = $res['cas'];
@@ -151,62 +192,77 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
                                $casToken = null;
                        }
                } else {
-                       $result = $this->client->get( $this->validateKeyEncoding( $key ), null, $casToken );
+                       $result = $client->get( $this->validateKeyEncoding( $key ), null, $casToken );
                }
-               $result = $this->checkResult( $key, $result );
-               return $result;
+
+               return $this->checkResult( $key, $result );
        }
 
        protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
                $this->debug( "set($key)" );
-               $result = $this->client->set(
+
+               $client = $this->acquireSyncClient();
+               $result = $client->set(
                        $this->validateKeyEncoding( $key ),
                        $value,
                        $this->fixExpiry( $exptime )
                );
-               if ( $result === false && $this->client->getResultCode() === Memcached::RES_NOTSTORED ) {
+
+               return ( $result === false && $client->getResultCode() === Memcached::RES_NOTSTORED )
                        // "Not stored" is always used as the mcrouter response with AllAsyncRoute
-                       return true;
-               }
-               return $this->checkResult( $key, $result );
+                       ? true
+                       : $this->checkResult( $key, $result );
        }
 
        protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
                $this->debug( "cas($key)" );
-               $result = $this->client->cas( $casToken, $this->validateKeyEncoding( $key ),
-                       $value, $this->fixExpiry( $exptime ) );
+
+               $result = $this->acquireSyncClient()->cas(
+                       $casToken,
+                       $this->validateKeyEncoding( $key ),
+                       $value, $this->fixExpiry( $exptime )
+               );
+
                return $this->checkResult( $key, $result );
        }
 
        protected function doDelete( $key, $flags = 0 ) {
                $this->debug( "delete($key)" );
-               $result = $this->client->delete( $this->validateKeyEncoding( $key ) );
-               if ( $result === false && $this->client->getResultCode() === Memcached::RES_NOTFOUND ) {
+
+               $client = $this->acquireSyncClient();
+               $result = $client->delete( $this->validateKeyEncoding( $key ) );
+
+               return ( $result === false && $client->getResultCode() === Memcached::RES_NOTFOUND )
                        // "Not found" is counted as success in our interface
-                       return true;
-               }
-               return $this->checkResult( $key, $result );
+                       ? true
+                       : $this->checkResult( $key, $result );
        }
 
        public function add( $key, $value, $exptime = 0, $flags = 0 ) {
                $this->debug( "add($key)" );
-               $result = $this->client->add(
+
+               $result = $this->acquireSyncClient()->add(
                        $this->validateKeyEncoding( $key ),
                        $value,
                        $this->fixExpiry( $exptime )
                );
+
                return $this->checkResult( $key, $result );
        }
 
        public function incr( $key, $value = 1 ) {
                $this->debug( "incr($key)" );
-               $result = $this->client->increment( $key, $value );
+
+               $result = $this->acquireSyncClient()->increment( $key, $value );
+
                return $this->checkResult( $key, $result );
        }
 
        public function decr( $key, $value = 1 ) {
                $this->debug( "decr($key)" );
-               $result = $this->client->decrement( $key, $value );
+
+               $result = $this->acquireSyncClient()->decrement( $key, $value );
+
                return $this->checkResult( $key, $result );
        }
 
@@ -225,22 +281,25 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
                if ( $result !== false ) {
                        return $result;
                }
-               switch ( $this->client->getResultCode() ) {
+
+               $client = $this->syncClient;
+               switch ( $client->getResultCode() ) {
                        case Memcached::RES_SUCCESS:
                                break;
                        case Memcached::RES_DATA_EXISTS:
                        case Memcached::RES_NOTSTORED:
                        case Memcached::RES_NOTFOUND:
-                               $this->debug( "result: " . $this->client->getResultMessage() );
+                               $this->debug( "result: " . $client->getResultMessage() );
                                break;
                        default:
-                               $msg = $this->client->getResultMessage();
+                               $msg = $client->getResultMessage();
                                $logCtx = [];
                                if ( $key !== false ) {
-                                       $server = $this->client->getServerByKey( $key );
+                                       $server = $client->getServerByKey( $key );
                                        $logCtx['memcached-server'] = "{$server['host']}:{$server['port']}";
                                        $logCtx['memcached-key'] = $key;
-                                       $msg = "Memcached error for key \"{memcached-key}\" on server \"{memcached-server}\": $msg";
+                                       $msg = "Memcached error for key \"{memcached-key}\" " .
+                                               "on server \"{memcached-server}\": $msg";
                                } else {
                                        $msg = "Memcached error: $msg";
                                }
@@ -250,43 +309,73 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
                return $result;
        }
 
-       public function doGetMulti( array $keys, $flags = 0 ) {
+       protected function doGetMulti( array $keys, $flags = 0 ) {
                $this->debug( 'getMulti(' . implode( ', ', $keys ) . ')' );
+
                foreach ( $keys as $key ) {
                        $this->validateKeyEncoding( $key );
                }
-               $result = $this->client->getMulti( $keys ) ?: [];
+
+               // The PECL implementation uses "gets" which works as well as a pipeline
+               $result = $this->acquireSyncClient()->getMulti( $keys ) ?: [];
+
                return $this->checkResult( false, $result );
        }
 
-       public function setMulti( array $data, $exptime = 0, $flags = 0 ) {
+       protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
                $this->debug( 'setMulti(' . implode( ', ', array_keys( $data ) ) . ')' );
+
+               $exptime = $this->fixExpiry( $exptime );
                foreach ( array_keys( $data ) as $key ) {
                        $this->validateKeyEncoding( $key );
                }
-               $result = $this->client->setMulti( $data, $this->fixExpiry( $exptime ) );
+
+               // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
+               // https://github.com/php-memcached-dev/php-memcached/blob/master/php_memcached.c#L1852
+               if ( ( $flags & self::WRITE_BACKGROUND ) == self::WRITE_BACKGROUND ) {
+                       $client = $this->acquireAsyncClient();
+                       $result = $client->setMulti( $data, $exptime );
+                       $this->releaseAsyncClient( $client );
+               } else {
+                       $result = $this->acquireSyncClient()->setMulti( $data, $exptime );
+               }
+
                return $this->checkResult( false, $result );
        }
 
-       public function deleteMulti( array $keys, $flags = 0 ) {
+       protected function doDeleteMulti( array $keys, $flags = 0 ) {
                $this->debug( 'deleteMulti(' . implode( ', ', $keys ) . ')' );
+
                foreach ( $keys as $key ) {
                        $this->validateKeyEncoding( $key );
                }
-               $result = $this->client->deleteMulti( $keys ) ?: [];
-               $ok = true;
-               foreach ( $result as $code ) {
+
+               // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
+               // https://github.com/php-memcached-dev/php-memcached/blob/7443d16d02fb73cdba2e90ae282446f80969229c/php_memcached.c#L1852
+               if ( ( $flags & self::WRITE_BACKGROUND ) == self::WRITE_BACKGROUND ) {
+                       $client = $this->acquireAsyncClient();
+                       $resultArray = $client->deleteMulti( $keys ) ?: [];
+                       $this->releaseAsyncClient( $client );
+               } else {
+                       $resultArray = $this->acquireSyncClient()->deleteMulti( $keys ) ?: [];
+               }
+
+               $result = true;
+               foreach ( $resultArray as $code ) {
                        if ( !in_array( $code, [ true, Memcached::RES_NOTFOUND ], true ) ) {
                                // "Not found" is counted as success in our interface
-                               $ok = false;
+                               $result = false;
                        }
                }
-               return $this->checkResult( false, $ok );
+
+               return $this->checkResult( false, $result );
        }
 
-       public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
+       protected function doChangeTTL( $key, $exptime, $flags ) {
                $this->debug( "touch($key)" );
-               $result = $this->client->touch( $key, $exptime );
+
+               $result = $this->acquireSyncClient()->touch( $key, $this->fixExpiry( $exptime ) );
+
                return $this->checkResult( $key, $result );
        }
 
@@ -295,7 +384,7 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
                        return $value;
                }
 
-               $serializer = $this->client->getOption( Memcached::OPT_SERIALIZER );
+               $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
                if ( $serializer === Memcached::SERIALIZER_PHP ) {
                        return serialize( $value );
                } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
@@ -310,7 +399,7 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
                        return (int)$value;
                }
 
-               $serializer = $this->client->getOption( Memcached::OPT_SERIALIZER );
+               $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
                if ( $serializer === Memcached::SERIALIZER_PHP ) {
                        return unserialize( $value );
                } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
@@ -319,4 +408,52 @@ class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
 
                throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
        }
+
+       /**
+        * @return Memcached
+        */
+       private function acquireSyncClient() {
+               if ( $this->syncClientIsBuffering ) {
+                       throw new RuntimeException( "The main (unbuffered I/O) client is locked" );
+               }
+
+               if ( $this->hasUnflushedChanges ) {
+                       // Force a synchronous flush of async writes so that their changes are visible
+                       $this->syncClient->fetch();
+                       if ( $this->asyncClient ) {
+                               $this->asyncClient->fetch();
+                       }
+                       $this->hasUnflushedChanges = false;
+               }
+
+               return $this->syncClient;
+       }
+
+       /**
+        * @return Memcached
+        */
+       private function acquireAsyncClient() {
+               if ( $this->asyncClient ) {
+                       return $this->asyncClient; // dedicated buffering instance
+               }
+
+               // Modify the main instance to temporarily buffer writes
+               $this->syncClientIsBuffering = true;
+               $this->syncClient->setOptions( self::$OPTS_ASYNC_WRITES );
+
+               return $this->syncClient;
+       }
+
+       /**
+        * @param Memcached $client
+        */
+       private function releaseAsyncClient( $client ) {
+               $this->hasUnflushedChanges = true;
+
+               if ( !$this->asyncClient ) {
+                       // This is the main instance; make it stop buffering writes again
+                       $client->setOptions( self::$OPTS_SYNC_WRITES );
+                       $this->syncClientIsBuffering = false;
+               }
+       }
 }