Merge "objectcache: optimize MemcachedPeclBagOStuff::*Multi() write methods"
authorjenkins-bot <jenkins-bot@gerrit.wikimedia.org>
Thu, 18 Jul 2019 15:28:13 +0000 (15:28 +0000)
committerGerrit Code Review <gerrit@wikimedia.org>
Thu, 18 Jul 2019 15:28:13 +0000 (15:28 +0000)
1  2 
includes/libs/objectcache/BagOStuff.php
includes/libs/objectcache/MemcachedPeclBagOStuff.php

@@@ -97,14 -97,15 +97,15 @@@ abstract class BagOStuff implements IEx
        /** @var int[] Map of (ATTR_* class constant => QOS_* class constant) */
        protected $attrMap = [];
  
-       /** Bitfield constants for get()/getMulti() */
-       const READ_LATEST = 1; // use latest data for replicated stores
-       const READ_VERIFIED = 2; // promise that caller can tell when keys are stale
-       /** Bitfield constants for set()/merge() */
-       const WRITE_SYNC = 4; // synchronously write to all locations for replicated stores
-       const WRITE_CACHE_ONLY = 8; // Only change state of the in-memory cache
-       const WRITE_ALLOW_SEGMENTS = 16; // Allow partitioning of the value if it is large
-       const WRITE_PRUNE_SEGMENTS = 32; // Delete all partition segments of the value
+       /** Bitfield constants for get()/getMulti(); these are only advisory */
+       const READ_LATEST = 1; // if supported, avoid reading stale data due to replication
+       const READ_VERIFIED = 2; // promise that the caller handles detection of staleness
+       /** Bitfield constants for set()/merge(); these are only advisory */
+       const WRITE_SYNC = 4; // if supported, block until the write is fully replicated
+       const WRITE_CACHE_ONLY = 8; // only change state of the in-memory cache
+       const WRITE_ALLOW_SEGMENTS = 16; // allow partitioning of the value if it is large
+       const WRITE_PRUNE_SEGMENTS = 32; // delete all the segments if the value is partitioned
+       const WRITE_BACKGROUND = 64; // if supported,
  
        /** @var string Component to use for key construction of blob segment keys */
        const SEGMENT_COMPONENT = 'segment';
         * @param int $flags Bitfield of BagOStuff::WRITE_* constants
         * @return bool Success
         */
 -      protected function mergeViaCas( $key, $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
 +      final protected function mergeViaCas( $key, callable $callback, $exptime, $attempts, $flags ) {
                do {
                        $casToken = null; // passed by reference
                        // Get the old value and CAS token from cache
        /**
         * Delete all objects expiring before a certain date.
         * @param string|int $timestamp The reference date in MW or TS_UNIX format
 -       * @param callable|null $progressCallback Optional, a function which will be called
 +       * @param callable|null $progress Optional, a function which will be called
         *     regularly during long-running operations with the percentage progress
         *     as the first parameter. [optional]
         * @param int $limit Maximum number of keys to delete [default: INF]
         *
 -       * @return bool Success, false if unimplemented
 +       * @return bool Success; false if unimplemented
         */
        public function deleteObjectsExpiringBefore(
                $timestamp,
 -              callable $progressCallback = null,
 +              callable $progress = null,
                $limit = INF
        ) {
 -              // stub
                return false;
        }
  
        /**
         * Get an associative array containing the item for each of the keys that have items.
 -       * @param string[] $keys List of keys
 +       * @param string[] $keys List of keys; can be a map of (unused => key) for convenience
         * @param int $flags Bitfield; supports READ_LATEST [optional]
 -       * @return array Map of (key => value) for existing keys
 +       * @return mixed[] Map of (key => value) for existing keys; preserves the order of $keys
         */
        public function getMulti( array $keys, $flags = 0 ) {
 -              $valuesBykey = $this->doGetMulti( $keys, $flags );
 -              foreach ( $valuesBykey as $key => $value ) {
 +              $foundByKey = $this->doGetMulti( $keys, $flags );
 +
 +              $res = [];
 +              foreach ( $keys as $key ) {
                        // Resolve one blob at a time (avoids too much I/O at once)
 -                      $valuesBykey[$key] = $this->resolveSegments( $key, $value );
 +                      if ( array_key_exists( $key, $foundByKey ) ) {
 +                              // A value should not appear in the key if a segment is missing
 +                              $value = $this->resolveSegments( $key, $foundByKey[$key] );
 +                              if ( $value !== false ) {
 +                                      $res[$key] = $value;
 +                              }
 +                      }
                }
  
 -              return $valuesBykey;
 +              return $res;
        }
  
        /**
         * Get an associative array containing the item for each of the keys that have items.
         * @param string[] $keys List of keys
         * @param int $flags Bitfield; supports READ_LATEST [optional]
 -       * @return array Map of (key => value) for existing keys
 +       * @return mixed[] Map of (key => value) for existing keys
         */
        protected function doGetMulti( array $keys, $flags = 0 ) {
                $res = [];
         *
         * This does not support WRITE_ALLOW_SEGMENTS to avoid excessive read I/O
         *
+        * WRITE_BACKGROUND can be used for bulk insertion where the response is not vital
+        *
         * @param mixed[] $data Map of (key => value)
         * @param int $exptime Either an interval in seconds or a unix timestamp for expiry
         * @param int $flags Bitfield of BagOStuff::WRITE_* constants (since 1.33)
         * @return bool Success
         * @since 1.24
         */
 -      final public function setMulti( array $data, $exptime = 0, $flags = 0 ) {
 +      public function setMulti( array $data, $exptime = 0, $flags = 0 ) {
                if ( ( $flags & self::WRITE_ALLOW_SEGMENTS ) === self::WRITE_ALLOW_SEGMENTS ) {
                        throw new InvalidArgumentException( __METHOD__ . ' got WRITE_ALLOW_SEGMENTS' );
                }
 -
                return $this->doSetMulti( $data, $exptime, $flags );
        }
  
                foreach ( $data as $key => $value ) {
                        $res = $this->doSet( $key, $value, $exptime, $flags ) && $res;
                }
 -
                return $res;
        }
  
         *
         * This does not support WRITE_ALLOW_SEGMENTS to avoid excessive read I/O
         *
+        * WRITE_BACKGROUND can be used for bulk deletion where the response is not vital
+        *
         * @param string[] $keys List of keys
         * @param int $flags Bitfield of BagOStuff::WRITE_* constants
         * @return bool Success
         * @since 1.33
         */
 -      final public function deleteMulti( array $keys, $flags = 0 ) {
 +      public function deleteMulti( array $keys, $flags = 0 ) {
                if ( ( $flags & self::WRITE_ALLOW_SEGMENTS ) === self::WRITE_ALLOW_SEGMENTS ) {
                        throw new InvalidArgumentException( __METHOD__ . ' got WRITE_ALLOW_SEGMENTS' );
                }
 -
                return $this->doDeleteMulti( $keys, $flags );
        }
  
                foreach ( $keys as $key ) {
                        $res = $this->doDelete( $key, $flags ) && $res;
                }
 -
                return $res;
        }
  
         * @param mixed $mainValue
         * @return string|null|bool The combined string, false if missing, null on error
         */
 -      protected function resolveSegments( $key, $mainValue ) {
 +      final protected function resolveSegments( $key, $mainValue ) {
                if ( SerializedValueContainer::isUnified( $mainValue ) ) {
                        return $this->unserialize( $mainValue->{SerializedValueContainer::UNIFIED_DATA} );
                }
         * @param callable $workCallback
         * @since 1.28
         */
 -      public function addBusyCallback( callable $workCallback ) {
 +      final public function addBusyCallback( callable $workCallback ) {
                $this->busyCallbacks[] = $workCallback;
        }
  
         */
        protected function debug( $text ) {
                if ( $this->debugMode ) {
 -                      $this->logger->debug( "{class} debug: $text", [
 -                              'class' => static::class,
 -                      ] );
 +                      $this->logger->debug( "{class} debug: $text", [ 'class' => static::class ] );
                }
        }
  
         * @param int $exptime
         * @return bool
         */
 -      protected function expiryIsRelative( $exptime ) {
 +      final protected function expiryIsRelative( $exptime ) {
                return ( $exptime != 0 && $exptime < ( 10 * self::TTL_YEAR ) );
        }
  
         * @param int $exptime Absolute TTL or 0 for indefinite
         * @return int
         */
 -      protected function convertToExpiry( $exptime ) {
 -              $exptime = (int)$exptime; // sanity
 -
 +      final protected function convertToExpiry( $exptime ) {
                return $this->expiryIsRelative( $exptime )
                        ? (int)$this->getCurrentTime() + $exptime
                        : $exptime;
         * @param int $exptime
         * @return int
         */
 -      protected function convertToRelative( $exptime ) {
 -              if ( $exptime >= ( 10 * self::TTL_YEAR ) ) {
 -                      $exptime -= (int)$this->getCurrentTime();
 -                      if ( $exptime <= 0 ) {
 -                              $exptime = 1;
 -                      }
 -                      return $exptime;
 -              } else {
 -                      return $exptime;
 -              }
 +      final protected function convertToRelative( $exptime ) {
 +              return $this->expiryIsRelative( $exptime )
 +                      ? (int)$exptime
 +                      : max( $exptime - (int)$this->getCurrentTime(), 1 );
        }
  
        /**
         * @param mixed $value
         * @return bool
         */
 -      protected function isInteger( $value ) {
 +      final protected function isInteger( $value ) {
                if ( is_int( $value ) ) {
                        return true;
                } elseif ( !is_string( $value ) ) {
         * @param BagOStuff[] $bags
         * @return int[] Resulting flag map (class ATTR_* constant => class QOS_* constant)
         */
 -      protected function mergeFlagMaps( array $bags ) {
 +      final protected function mergeFlagMaps( array $bags ) {
                $map = [];
                foreach ( $bags as $bag ) {
                        foreach ( $bag->attrMap as $attr => $rank ) {
   */
  class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
        /** @var Memcached */
-       protected $client;
+       protected $syncClient;
+       /** @var Memcached|null */
+       protected $asyncClient;
+       /** @var bool Whether the non-buffering client is locked from use */
+       protected $syncClientIsBuffering = false;
+       /** @var bool Whether the non-buffering client should be flushed before use */
+       protected $hasUnflushedChanges = false;
+       /** @var array Memcached options */
+       private static $OPTS_SYNC_WRITES = [
+               Memcached::OPT_NO_BLOCK => false, // async I/O (using TCP buffers)
+               Memcached::OPT_BUFFER_WRITES => false // libmemcached buffers
+       ];
+       /** @var array Memcached options */
+       private static $OPTS_ASYNC_WRITES = [
+               Memcached::OPT_NO_BLOCK => true, // async I/O (using TCP buffers)
+               Memcached::OPT_BUFFER_WRITES => true // libmemcached buffers
+       ];
  
        /**
         * Available parameters are:
                        // The Memcached object is essentially shared for each pool ID.
                        // We can only reuse a pool ID if we keep the config consistent.
                        $connectionPoolId = md5( serialize( $params ) );
-                       $client = new Memcached( $connectionPoolId );
-                       $this->initializeClient( $client, $params );
+                       $syncClient = new Memcached( "$connectionPoolId-sync" );
+                       // Avoid clobbering the main thread-shared Memcached instance
+                       $asyncClient = new Memcached( "$connectionPoolId-async" );
                } else {
-                       $client = new Memcached;
-                       $this->initializeClient( $client, $params );
+                       $syncClient = new Memcached();
+                       $asyncClient = null;
                }
  
-               $this->client = $client;
+               $this->initializeClient( $syncClient, $params, self::$OPTS_SYNC_WRITES );
+               if ( $asyncClient ) {
+                       $this->initializeClient( $asyncClient, $params, self::$OPTS_ASYNC_WRITES );
+               }
  
+               // Set the main client and any dedicated one for buffered writes
+               $this->syncClient = $syncClient;
+               $this->asyncClient = $asyncClient;
                // The compression threshold is an undocumented php.ini option for some
                // reason. There's probably not much harm in setting it globally, for
                // compatibility with the settings for the PHP client.
         *
         * @param Memcached $client
         * @param array $params
+        * @param array $options Base options for Memcached::setOptions()
         * @throws RuntimeException
         */
-       private function initializeClient( Memcached $client, array $params ) {
+       private function initializeClient( Memcached $client, array $params, array $options ) {
                if ( $client->getServerList() ) {
                        $this->logger->debug( __METHOD__ . ": pre-initialized client instance." );
  
  
                $this->logger->debug( __METHOD__ . ": initializing new client instance." );
  
-               $options = [
+               $options += [
+                       Memcached::OPT_NO_BLOCK => false,
+                       Memcached::OPT_BUFFER_WRITES => false,
                        // Network protocol (ASCII or binary)
                        Memcached::OPT_BINARY_PROTOCOL => $params['use_binary_protocol'],
                        // Set various network timeouts
  
        protected function doGet( $key, $flags = 0, &$casToken = null ) {
                $this->debug( "get($key)" );
+               $client = $this->acquireSyncClient();
                if ( defined( Memcached::class . '::GET_EXTENDED' ) ) { // v3.0.0
                        /** @noinspection PhpUndefinedClassConstantInspection */
                        $flags = Memcached::GET_EXTENDED;
-                       $res = $this->client->get( $this->validateKeyEncoding( $key ), null, $flags );
+                       $res = $client->get( $this->validateKeyEncoding( $key ), null, $flags );
                        if ( is_array( $res ) ) {
                                $result = $res['value'];
                                $casToken = $res['cas'];
                                $casToken = null;
                        }
                } else {
-                       $result = $this->client->get( $this->validateKeyEncoding( $key ), null, $casToken );
+                       $result = $client->get( $this->validateKeyEncoding( $key ), null, $casToken );
                }
-               $result = $this->checkResult( $key, $result );
-               return $result;
+               return $this->checkResult( $key, $result );
        }
  
        protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
                $this->debug( "set($key)" );
-               $result = $this->client->set(
+               $client = $this->acquireSyncClient();
+               $result = $client->set(
                        $this->validateKeyEncoding( $key ),
                        $value,
                        $this->fixExpiry( $exptime )
                );
-               if ( $result === false && $this->client->getResultCode() === Memcached::RES_NOTSTORED ) {
+               return ( $result === false && $client->getResultCode() === Memcached::RES_NOTSTORED )
                        // "Not stored" is always used as the mcrouter response with AllAsyncRoute
-                       return true;
-               }
-               return $this->checkResult( $key, $result );
+                       ? true
+                       : $this->checkResult( $key, $result );
        }
  
        protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
                $this->debug( "cas($key)" );
-               $result = $this->client->cas( $casToken, $this->validateKeyEncoding( $key ),
-                       $value, $this->fixExpiry( $exptime ) );
+               $result = $this->acquireSyncClient()->cas(
+                       $casToken,
+                       $this->validateKeyEncoding( $key ),
+                       $value, $this->fixExpiry( $exptime )
+               );
                return $this->checkResult( $key, $result );
        }
  
        protected function doDelete( $key, $flags = 0 ) {
                $this->debug( "delete($key)" );
-               $result = $this->client->delete( $this->validateKeyEncoding( $key ) );
-               if ( $result === false && $this->client->getResultCode() === Memcached::RES_NOTFOUND ) {
+               $client = $this->acquireSyncClient();
+               $result = $client->delete( $this->validateKeyEncoding( $key ) );
+               return ( $result === false && $client->getResultCode() === Memcached::RES_NOTFOUND )
                        // "Not found" is counted as success in our interface
-                       return true;
-               }
-               return $this->checkResult( $key, $result );
+                       ? true
+                       : $this->checkResult( $key, $result );
        }
  
        public function add( $key, $value, $exptime = 0, $flags = 0 ) {
                $this->debug( "add($key)" );
-               $result = $this->client->add(
+               $result = $this->acquireSyncClient()->add(
                        $this->validateKeyEncoding( $key ),
                        $value,
                        $this->fixExpiry( $exptime )
                );
                return $this->checkResult( $key, $result );
        }
  
        public function incr( $key, $value = 1 ) {
                $this->debug( "incr($key)" );
-               $result = $this->client->increment( $key, $value );
+               $result = $this->acquireSyncClient()->increment( $key, $value );
                return $this->checkResult( $key, $result );
        }
  
        public function decr( $key, $value = 1 ) {
                $this->debug( "decr($key)" );
-               $result = $this->client->decrement( $key, $value );
+               $result = $this->acquireSyncClient()->decrement( $key, $value );
                return $this->checkResult( $key, $result );
        }
  
                if ( $result !== false ) {
                        return $result;
                }
-               switch ( $this->client->getResultCode() ) {
+               $client = $this->syncClient;
+               switch ( $client->getResultCode() ) {
                        case Memcached::RES_SUCCESS:
                                break;
                        case Memcached::RES_DATA_EXISTS:
                        case Memcached::RES_NOTSTORED:
                        case Memcached::RES_NOTFOUND:
-                               $this->debug( "result: " . $this->client->getResultMessage() );
+                               $this->debug( "result: " . $client->getResultMessage() );
                                break;
                        default:
-                               $msg = $this->client->getResultMessage();
+                               $msg = $client->getResultMessage();
                                $logCtx = [];
                                if ( $key !== false ) {
-                                       $server = $this->client->getServerByKey( $key );
+                                       $server = $client->getServerByKey( $key );
                                        $logCtx['memcached-server'] = "{$server['host']}:{$server['port']}";
                                        $logCtx['memcached-key'] = $key;
-                                       $msg = "Memcached error for key \"{memcached-key}\" on server \"{memcached-server}\": $msg";
+                                       $msg = "Memcached error for key \"{memcached-key}\" " .
+                                               "on server \"{memcached-server}\": $msg";
                                } else {
                                        $msg = "Memcached error: $msg";
                                }
                return $result;
        }
  
 -      public function doGetMulti( array $keys, $flags = 0 ) {
 +      protected function doGetMulti( array $keys, $flags = 0 ) {
                $this->debug( 'getMulti(' . implode( ', ', $keys ) . ')' );
                foreach ( $keys as $key ) {
                        $this->validateKeyEncoding( $key );
                }
-               $result = $this->client->getMulti( $keys ) ?: [];
+               // The PECL implementation uses "gets" which works as well as a pipeline
+               $result = $this->acquireSyncClient()->getMulti( $keys ) ?: [];
                return $this->checkResult( false, $result );
        }
  
 -      public function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
 +      protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
                $this->debug( 'setMulti(' . implode( ', ', array_keys( $data ) ) . ')' );
+               $exptime = $this->fixExpiry( $exptime );
                foreach ( array_keys( $data ) as $key ) {
                        $this->validateKeyEncoding( $key );
                }
-               $result = $this->client->setMulti( $data, $this->fixExpiry( $exptime ) );
+               // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
+               // https://github.com/php-memcached-dev/php-memcached/blob/master/php_memcached.c#L1852
+               if ( ( $flags & self::WRITE_BACKGROUND ) == self::WRITE_BACKGROUND ) {
+                       $client = $this->acquireAsyncClient();
+                       $result = $client->setMulti( $data, $exptime );
+                       $this->releaseAsyncClient( $client );
+               } else {
+                       $result = $this->acquireSyncClient()->setMulti( $data, $exptime );
+               }
                return $this->checkResult( false, $result );
        }
  
 -      public function doDeleteMulti( array $keys, $flags = 0 ) {
 +      protected function doDeleteMulti( array $keys, $flags = 0 ) {
                $this->debug( 'deleteMulti(' . implode( ', ', $keys ) . ')' );
                foreach ( $keys as $key ) {
                        $this->validateKeyEncoding( $key );
                }
-               $result = $this->client->deleteMulti( $keys ) ?: [];
-               $ok = true;
-               foreach ( $result as $code ) {
+               // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
+               // https://github.com/php-memcached-dev/php-memcached/blob/7443d16d02fb73cdba2e90ae282446f80969229c/php_memcached.c#L1852
+               if ( ( $flags & self::WRITE_BACKGROUND ) == self::WRITE_BACKGROUND ) {
+                       $client = $this->acquireAsyncClient();
+                       $resultArray = $client->deleteMulti( $keys ) ?: [];
+                       $this->releaseAsyncClient( $client );
+               } else {
+                       $resultArray = $this->acquireSyncClient()->deleteMulti( $keys ) ?: [];
+               }
+               $result = true;
+               foreach ( $resultArray as $code ) {
                        if ( !in_array( $code, [ true, Memcached::RES_NOTFOUND ], true ) ) {
                                // "Not found" is counted as success in our interface
-                               $ok = false;
+                               $result = false;
                        }
                }
-               return $this->checkResult( false, $ok );
+               return $this->checkResult( false, $result );
        }
  
        protected function doChangeTTL( $key, $exptime, $flags ) {
                $this->debug( "touch($key)" );
-               $result = $this->client->touch( $key, $exptime );
+               $result = $this->acquireSyncClient()->touch( $key, $this->fixExpiry( $exptime ) );
                return $this->checkResult( $key, $result );
        }
  
                        return $value;
                }
  
-               $serializer = $this->client->getOption( Memcached::OPT_SERIALIZER );
+               $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
                if ( $serializer === Memcached::SERIALIZER_PHP ) {
                        return serialize( $value );
                } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
                        return (int)$value;
                }
  
-               $serializer = $this->client->getOption( Memcached::OPT_SERIALIZER );
+               $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
                if ( $serializer === Memcached::SERIALIZER_PHP ) {
                        return unserialize( $value );
                } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
  
                throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
        }
+       /**
+        * @return Memcached
+        */
+       private function acquireSyncClient() {
+               if ( $this->syncClientIsBuffering ) {
+                       throw new RuntimeException( "The main (unbuffered I/O) client is locked" );
+               }
+               if ( $this->hasUnflushedChanges ) {
+                       // Force a synchronous flush of async writes so that their changes are visible
+                       $this->syncClient->fetch();
+                       if ( $this->asyncClient ) {
+                               $this->asyncClient->fetch();
+                       }
+                       $this->hasUnflushedChanges = false;
+               }
+               return $this->syncClient;
+       }
+       /**
+        * @return Memcached
+        */
+       private function acquireAsyncClient() {
+               if ( $this->asyncClient ) {
+                       return $this->asyncClient; // dedicated buffering instance
+               }
+               // Modify the main instance to temporarily buffer writes
+               $this->syncClientIsBuffering = true;
+               $this->syncClient->setOptions( self::$OPTS_ASYNC_WRITES );
+               return $this->syncClient;
+       }
+       /**
+        * @param Memcached $client
+        */
+       private function releaseAsyncClient( $client ) {
+               $this->hasUnflushedChanges = true;
+               if ( !$this->asyncClient ) {
+                       // This is the main instance; make it stop buffering writes again
+                       $client->setOptions( self::$OPTS_SYNC_WRITES );
+                       $this->syncClientIsBuffering = false;
+               }
+       }
  }