/** @var int[] Map of (ATTR_* class constant => QOS_* class constant) */
protected $attrMap = [];
- /** Bitfield constants for get()/getMulti() */
- const READ_LATEST = 1; // use latest data for replicated stores
- const READ_VERIFIED = 2; // promise that caller can tell when keys are stale
- /** Bitfield constants for set()/merge() */
- const WRITE_SYNC = 4; // synchronously write to all locations for replicated stores
- const WRITE_CACHE_ONLY = 8; // Only change state of the in-memory cache
- const WRITE_ALLOW_SEGMENTS = 16; // Allow partitioning of the value if it is large
- const WRITE_PRUNE_SEGMENTS = 32; // Delete all partition segments of the value
+ /** Bitfield constants for get()/getMulti(); these are only advisory */
+ const READ_LATEST = 1; // if supported, avoid reading stale data due to replication
+ const READ_VERIFIED = 2; // promise that the caller handles detection of staleness
+ /** Bitfield constants for set()/merge(); these are only advisory */
+ const WRITE_SYNC = 4; // if supported, block until the write is fully replicated
+ const WRITE_CACHE_ONLY = 8; // only change state of the in-memory cache
+ const WRITE_ALLOW_SEGMENTS = 16; // allow partitioning of the value if it is large
+ const WRITE_PRUNE_SEGMENTS = 32; // delete all the segments if the value is partitioned
+ const WRITE_BACKGROUND = 64; // if supported,
/** @var string Component to use for key construction of blob segment keys */
const SEGMENT_COMPONENT = 'segment';
* @param int $flags Bitfield of BagOStuff::WRITE_* constants
* @return bool Success
*/
- protected function mergeViaCas( $key, $callback, $exptime = 0, $attempts = 10, $flags = 0 ) {
+ final protected function mergeViaCas( $key, callable $callback, $exptime, $attempts, $flags ) {
do {
$casToken = null; // passed by reference
// Get the old value and CAS token from cache
/**
* Delete all objects expiring before a certain date.
* @param string|int $timestamp The reference date in MW or TS_UNIX format
- * @param callable|null $progressCallback Optional, a function which will be called
+ * @param callable|null $progress Optional, a function which will be called
* regularly during long-running operations with the percentage progress
* as the first parameter. [optional]
* @param int $limit Maximum number of keys to delete [default: INF]
*
- * @return bool Success, false if unimplemented
+ * @return bool Success; false if unimplemented
*/
public function deleteObjectsExpiringBefore(
$timestamp,
- callable $progressCallback = null,
+ callable $progress = null,
$limit = INF
) {
- // stub
return false;
}
/**
* Get an associative array containing the item for each of the keys that have items.
- * @param string[] $keys List of keys
+ * @param string[] $keys List of keys; can be a map of (unused => key) for convenience
* @param int $flags Bitfield; supports READ_LATEST [optional]
- * @return array Map of (key => value) for existing keys
+ * @return mixed[] Map of (key => value) for existing keys; preserves the order of $keys
*/
public function getMulti( array $keys, $flags = 0 ) {
- $valuesBykey = $this->doGetMulti( $keys, $flags );
- foreach ( $valuesBykey as $key => $value ) {
+ $foundByKey = $this->doGetMulti( $keys, $flags );
+
+ $res = [];
+ foreach ( $keys as $key ) {
// Resolve one blob at a time (avoids too much I/O at once)
- $valuesBykey[$key] = $this->resolveSegments( $key, $value );
+ if ( array_key_exists( $key, $foundByKey ) ) {
+ // A value should not appear in the key if a segment is missing
+ $value = $this->resolveSegments( $key, $foundByKey[$key] );
+ if ( $value !== false ) {
+ $res[$key] = $value;
+ }
+ }
}
- return $valuesBykey;
+ return $res;
}
/**
* Get an associative array containing the item for each of the keys that have items.
* @param string[] $keys List of keys
* @param int $flags Bitfield; supports READ_LATEST [optional]
- * @return array Map of (key => value) for existing keys
+ * @return mixed[] Map of (key => value) for existing keys
*/
protected function doGetMulti( array $keys, $flags = 0 ) {
$res = [];
*
* This does not support WRITE_ALLOW_SEGMENTS to avoid excessive read I/O
*
+ * WRITE_BACKGROUND can be used for bulk insertion where the response is not vital
+ *
* @param mixed[] $data Map of (key => value)
* @param int $exptime Either an interval in seconds or a unix timestamp for expiry
* @param int $flags Bitfield of BagOStuff::WRITE_* constants (since 1.33)
* @return bool Success
* @since 1.24
*/
- final public function setMulti( array $data, $exptime = 0, $flags = 0 ) {
+ public function setMulti( array $data, $exptime = 0, $flags = 0 ) {
if ( ( $flags & self::WRITE_ALLOW_SEGMENTS ) === self::WRITE_ALLOW_SEGMENTS ) {
throw new InvalidArgumentException( __METHOD__ . ' got WRITE_ALLOW_SEGMENTS' );
}
-
return $this->doSetMulti( $data, $exptime, $flags );
}
foreach ( $data as $key => $value ) {
$res = $this->doSet( $key, $value, $exptime, $flags ) && $res;
}
-
return $res;
}
*
* This does not support WRITE_ALLOW_SEGMENTS to avoid excessive read I/O
*
+ * WRITE_BACKGROUND can be used for bulk deletion where the response is not vital
+ *
* @param string[] $keys List of keys
* @param int $flags Bitfield of BagOStuff::WRITE_* constants
* @return bool Success
* @since 1.33
*/
- final public function deleteMulti( array $keys, $flags = 0 ) {
+ public function deleteMulti( array $keys, $flags = 0 ) {
if ( ( $flags & self::WRITE_ALLOW_SEGMENTS ) === self::WRITE_ALLOW_SEGMENTS ) {
throw new InvalidArgumentException( __METHOD__ . ' got WRITE_ALLOW_SEGMENTS' );
}
-
return $this->doDeleteMulti( $keys, $flags );
}
foreach ( $keys as $key ) {
$res = $this->doDelete( $key, $flags ) && $res;
}
-
return $res;
}
* @param mixed $mainValue
* @return string|null|bool The combined string, false if missing, null on error
*/
- protected function resolveSegments( $key, $mainValue ) {
+ final protected function resolveSegments( $key, $mainValue ) {
if ( SerializedValueContainer::isUnified( $mainValue ) ) {
return $this->unserialize( $mainValue->{SerializedValueContainer::UNIFIED_DATA} );
}
* @param callable $workCallback
* @since 1.28
*/
- public function addBusyCallback( callable $workCallback ) {
+ final public function addBusyCallback( callable $workCallback ) {
$this->busyCallbacks[] = $workCallback;
}
*/
protected function debug( $text ) {
if ( $this->debugMode ) {
- $this->logger->debug( "{class} debug: $text", [
- 'class' => static::class,
- ] );
+ $this->logger->debug( "{class} debug: $text", [ 'class' => static::class ] );
}
}
* @param int $exptime
* @return bool
*/
- protected function expiryIsRelative( $exptime ) {
+ final protected function expiryIsRelative( $exptime ) {
return ( $exptime != 0 && $exptime < ( 10 * self::TTL_YEAR ) );
}
* @param int $exptime Absolute TTL or 0 for indefinite
* @return int
*/
- protected function convertToExpiry( $exptime ) {
- $exptime = (int)$exptime; // sanity
-
+ final protected function convertToExpiry( $exptime ) {
return $this->expiryIsRelative( $exptime )
? (int)$this->getCurrentTime() + $exptime
: $exptime;
* @param int $exptime
* @return int
*/
- protected function convertToRelative( $exptime ) {
- if ( $exptime >= ( 10 * self::TTL_YEAR ) ) {
- $exptime -= (int)$this->getCurrentTime();
- if ( $exptime <= 0 ) {
- $exptime = 1;
- }
- return $exptime;
- } else {
- return $exptime;
- }
+ final protected function convertToRelative( $exptime ) {
+ return $this->expiryIsRelative( $exptime )
+ ? (int)$exptime
+ : max( $exptime - (int)$this->getCurrentTime(), 1 );
}
/**
* @param mixed $value
* @return bool
*/
- protected function isInteger( $value ) {
+ final protected function isInteger( $value ) {
if ( is_int( $value ) ) {
return true;
} elseif ( !is_string( $value ) ) {
* @param BagOStuff[] $bags
* @return int[] Resulting flag map (class ATTR_* constant => class QOS_* constant)
*/
- protected function mergeFlagMaps( array $bags ) {
+ final protected function mergeFlagMaps( array $bags ) {
$map = [];
foreach ( $bags as $bag ) {
foreach ( $bag->attrMap as $attr => $rank ) {
*/
class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
/** @var Memcached */
- protected $client;
+ protected $syncClient;
+ /** @var Memcached|null */
+ protected $asyncClient;
+
+ /** @var bool Whether the non-buffering client is locked from use */
+ protected $syncClientIsBuffering = false;
+ /** @var bool Whether the non-buffering client should be flushed before use */
+ protected $hasUnflushedChanges = false;
+
+ /** @var array Memcached options */
+ private static $OPTS_SYNC_WRITES = [
+ Memcached::OPT_NO_BLOCK => false, // async I/O (using TCP buffers)
+ Memcached::OPT_BUFFER_WRITES => false // libmemcached buffers
+ ];
+ /** @var array Memcached options */
+ private static $OPTS_ASYNC_WRITES = [
+ Memcached::OPT_NO_BLOCK => true, // async I/O (using TCP buffers)
+ Memcached::OPT_BUFFER_WRITES => true // libmemcached buffers
+ ];
/**
* Available parameters are:
// The Memcached object is essentially shared for each pool ID.
// We can only reuse a pool ID if we keep the config consistent.
$connectionPoolId = md5( serialize( $params ) );
- $client = new Memcached( $connectionPoolId );
- $this->initializeClient( $client, $params );
+ $syncClient = new Memcached( "$connectionPoolId-sync" );
+ // Avoid clobbering the main thread-shared Memcached instance
+ $asyncClient = new Memcached( "$connectionPoolId-async" );
} else {
- $client = new Memcached;
- $this->initializeClient( $client, $params );
+ $syncClient = new Memcached();
+ $asyncClient = null;
}
- $this->client = $client;
+ $this->initializeClient( $syncClient, $params, self::$OPTS_SYNC_WRITES );
+ if ( $asyncClient ) {
+ $this->initializeClient( $asyncClient, $params, self::$OPTS_ASYNC_WRITES );
+ }
+ // Set the main client and any dedicated one for buffered writes
+ $this->syncClient = $syncClient;
+ $this->asyncClient = $asyncClient;
// The compression threshold is an undocumented php.ini option for some
// reason. There's probably not much harm in setting it globally, for
// compatibility with the settings for the PHP client.
*
* @param Memcached $client
* @param array $params
+ * @param array $options Base options for Memcached::setOptions()
* @throws RuntimeException
*/
- private function initializeClient( Memcached $client, array $params ) {
+ private function initializeClient( Memcached $client, array $params, array $options ) {
if ( $client->getServerList() ) {
$this->logger->debug( __METHOD__ . ": pre-initialized client instance." );
$this->logger->debug( __METHOD__ . ": initializing new client instance." );
- $options = [
+ $options += [
+ Memcached::OPT_NO_BLOCK => false,
+ Memcached::OPT_BUFFER_WRITES => false,
// Network protocol (ASCII or binary)
Memcached::OPT_BINARY_PROTOCOL => $params['use_binary_protocol'],
// Set various network timeouts
protected function doGet( $key, $flags = 0, &$casToken = null ) {
$this->debug( "get($key)" );
+
+ $client = $this->acquireSyncClient();
if ( defined( Memcached::class . '::GET_EXTENDED' ) ) { // v3.0.0
/** @noinspection PhpUndefinedClassConstantInspection */
$flags = Memcached::GET_EXTENDED;
- $res = $this->client->get( $this->validateKeyEncoding( $key ), null, $flags );
+ $res = $client->get( $this->validateKeyEncoding( $key ), null, $flags );
if ( is_array( $res ) ) {
$result = $res['value'];
$casToken = $res['cas'];
$casToken = null;
}
} else {
- $result = $this->client->get( $this->validateKeyEncoding( $key ), null, $casToken );
+ $result = $client->get( $this->validateKeyEncoding( $key ), null, $casToken );
}
- $result = $this->checkResult( $key, $result );
- return $result;
+
+ return $this->checkResult( $key, $result );
}
protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
$this->debug( "set($key)" );
- $result = $this->client->set(
+
+ $client = $this->acquireSyncClient();
+ $result = $client->set(
$this->validateKeyEncoding( $key ),
$value,
$this->fixExpiry( $exptime )
);
- if ( $result === false && $this->client->getResultCode() === Memcached::RES_NOTSTORED ) {
+
+ return ( $result === false && $client->getResultCode() === Memcached::RES_NOTSTORED )
// "Not stored" is always used as the mcrouter response with AllAsyncRoute
- return true;
- }
- return $this->checkResult( $key, $result );
+ ? true
+ : $this->checkResult( $key, $result );
}
protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
$this->debug( "cas($key)" );
- $result = $this->client->cas( $casToken, $this->validateKeyEncoding( $key ),
- $value, $this->fixExpiry( $exptime ) );
+
+ $result = $this->acquireSyncClient()->cas(
+ $casToken,
+ $this->validateKeyEncoding( $key ),
+ $value, $this->fixExpiry( $exptime )
+ );
+
return $this->checkResult( $key, $result );
}
protected function doDelete( $key, $flags = 0 ) {
$this->debug( "delete($key)" );
- $result = $this->client->delete( $this->validateKeyEncoding( $key ) );
- if ( $result === false && $this->client->getResultCode() === Memcached::RES_NOTFOUND ) {
+
+ $client = $this->acquireSyncClient();
+ $result = $client->delete( $this->validateKeyEncoding( $key ) );
+
+ return ( $result === false && $client->getResultCode() === Memcached::RES_NOTFOUND )
// "Not found" is counted as success in our interface
- return true;
- }
- return $this->checkResult( $key, $result );
+ ? true
+ : $this->checkResult( $key, $result );
}
public function add( $key, $value, $exptime = 0, $flags = 0 ) {
$this->debug( "add($key)" );
- $result = $this->client->add(
+
+ $result = $this->acquireSyncClient()->add(
$this->validateKeyEncoding( $key ),
$value,
$this->fixExpiry( $exptime )
);
+
return $this->checkResult( $key, $result );
}
public function incr( $key, $value = 1 ) {
$this->debug( "incr($key)" );
- $result = $this->client->increment( $key, $value );
+
+ $result = $this->acquireSyncClient()->increment( $key, $value );
+
return $this->checkResult( $key, $result );
}
public function decr( $key, $value = 1 ) {
$this->debug( "decr($key)" );
- $result = $this->client->decrement( $key, $value );
+
+ $result = $this->acquireSyncClient()->decrement( $key, $value );
+
return $this->checkResult( $key, $result );
}
if ( $result !== false ) {
return $result;
}
- switch ( $this->client->getResultCode() ) {
+
+ $client = $this->syncClient;
+ switch ( $client->getResultCode() ) {
case Memcached::RES_SUCCESS:
break;
case Memcached::RES_DATA_EXISTS:
case Memcached::RES_NOTSTORED:
case Memcached::RES_NOTFOUND:
- $this->debug( "result: " . $this->client->getResultMessage() );
+ $this->debug( "result: " . $client->getResultMessage() );
break;
default:
- $msg = $this->client->getResultMessage();
+ $msg = $client->getResultMessage();
$logCtx = [];
if ( $key !== false ) {
- $server = $this->client->getServerByKey( $key );
+ $server = $client->getServerByKey( $key );
$logCtx['memcached-server'] = "{$server['host']}:{$server['port']}";
$logCtx['memcached-key'] = $key;
- $msg = "Memcached error for key \"{memcached-key}\" on server \"{memcached-server}\": $msg";
+ $msg = "Memcached error for key \"{memcached-key}\" " .
+ "on server \"{memcached-server}\": $msg";
} else {
$msg = "Memcached error: $msg";
}
return $result;
}
- public function doGetMulti( array $keys, $flags = 0 ) {
+ protected function doGetMulti( array $keys, $flags = 0 ) {
$this->debug( 'getMulti(' . implode( ', ', $keys ) . ')' );
+
foreach ( $keys as $key ) {
$this->validateKeyEncoding( $key );
}
- $result = $this->client->getMulti( $keys ) ?: [];
+
+ // The PECL implementation uses "gets" which works as well as a pipeline
+ $result = $this->acquireSyncClient()->getMulti( $keys ) ?: [];
+
return $this->checkResult( false, $result );
}
- public function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
+ protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
$this->debug( 'setMulti(' . implode( ', ', array_keys( $data ) ) . ')' );
+
+ $exptime = $this->fixExpiry( $exptime );
foreach ( array_keys( $data ) as $key ) {
$this->validateKeyEncoding( $key );
}
- $result = $this->client->setMulti( $data, $this->fixExpiry( $exptime ) );
+
+ // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
+ // https://github.com/php-memcached-dev/php-memcached/blob/master/php_memcached.c#L1852
+ if ( ( $flags & self::WRITE_BACKGROUND ) == self::WRITE_BACKGROUND ) {
+ $client = $this->acquireAsyncClient();
+ $result = $client->setMulti( $data, $exptime );
+ $this->releaseAsyncClient( $client );
+ } else {
+ $result = $this->acquireSyncClient()->setMulti( $data, $exptime );
+ }
+
return $this->checkResult( false, $result );
}
- public function doDeleteMulti( array $keys, $flags = 0 ) {
+ protected function doDeleteMulti( array $keys, $flags = 0 ) {
$this->debug( 'deleteMulti(' . implode( ', ', $keys ) . ')' );
+
foreach ( $keys as $key ) {
$this->validateKeyEncoding( $key );
}
- $result = $this->client->deleteMulti( $keys ) ?: [];
- $ok = true;
- foreach ( $result as $code ) {
+
+ // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
+ // https://github.com/php-memcached-dev/php-memcached/blob/7443d16d02fb73cdba2e90ae282446f80969229c/php_memcached.c#L1852
+ if ( ( $flags & self::WRITE_BACKGROUND ) == self::WRITE_BACKGROUND ) {
+ $client = $this->acquireAsyncClient();
+ $resultArray = $client->deleteMulti( $keys ) ?: [];
+ $this->releaseAsyncClient( $client );
+ } else {
+ $resultArray = $this->acquireSyncClient()->deleteMulti( $keys ) ?: [];
+ }
+
+ $result = true;
+ foreach ( $resultArray as $code ) {
if ( !in_array( $code, [ true, Memcached::RES_NOTFOUND ], true ) ) {
// "Not found" is counted as success in our interface
- $ok = false;
+ $result = false;
}
}
- return $this->checkResult( false, $ok );
+
+ return $this->checkResult( false, $result );
}
protected function doChangeTTL( $key, $exptime, $flags ) {
$this->debug( "touch($key)" );
- $result = $this->client->touch( $key, $exptime );
+
+ $result = $this->acquireSyncClient()->touch( $key, $this->fixExpiry( $exptime ) );
+
return $this->checkResult( $key, $result );
}
return $value;
}
- $serializer = $this->client->getOption( Memcached::OPT_SERIALIZER );
+ $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
if ( $serializer === Memcached::SERIALIZER_PHP ) {
return serialize( $value );
} elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
return (int)$value;
}
- $serializer = $this->client->getOption( Memcached::OPT_SERIALIZER );
+ $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
if ( $serializer === Memcached::SERIALIZER_PHP ) {
return unserialize( $value );
} elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
}
+
+ /**
+ * @return Memcached
+ */
+ private function acquireSyncClient() {
+ if ( $this->syncClientIsBuffering ) {
+ throw new RuntimeException( "The main (unbuffered I/O) client is locked" );
+ }
+
+ if ( $this->hasUnflushedChanges ) {
+ // Force a synchronous flush of async writes so that their changes are visible
+ $this->syncClient->fetch();
+ if ( $this->asyncClient ) {
+ $this->asyncClient->fetch();
+ }
+ $this->hasUnflushedChanges = false;
+ }
+
+ return $this->syncClient;
+ }
+
+ /**
+ * @return Memcached
+ */
+ private function acquireAsyncClient() {
+ if ( $this->asyncClient ) {
+ return $this->asyncClient; // dedicated buffering instance
+ }
+
+ // Modify the main instance to temporarily buffer writes
+ $this->syncClientIsBuffering = true;
+ $this->syncClient->setOptions( self::$OPTS_ASYNC_WRITES );
+
+ return $this->syncClient;
+ }
+
+ /**
+ * @param Memcached $client
+ */
+ private function releaseAsyncClient( $client ) {
+ $this->hasUnflushedChanges = true;
+
+ if ( !$this->asyncClient ) {
+ // This is the main instance; make it stop buffering writes again
+ $client->setOptions( self::$OPTS_SYNC_WRITES );
+ $this->syncClientIsBuffering = false;
+ }
+ }
}