Merge "objectcache: optimize MemcachedPeclBagOStuff::*Multi() write methods"
[lhc/web/wiklou.git] / includes / libs / objectcache / MemcachedPeclBagOStuff.php
1 <?php
2 /**
3 * Object caching using memcached.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Cache
22 */
23
24 /**
25 * A wrapper class for the PECL memcached client
26 *
27 * @ingroup Cache
28 */
29 class MemcachedPeclBagOStuff extends MemcachedBagOStuff {
30 /** @var Memcached */
31 protected $syncClient;
32 /** @var Memcached|null */
33 protected $asyncClient;
34
35 /** @var bool Whether the non-buffering client is locked from use */
36 protected $syncClientIsBuffering = false;
37 /** @var bool Whether the non-buffering client should be flushed before use */
38 protected $hasUnflushedChanges = false;
39
40 /** @var array Memcached options */
41 private static $OPTS_SYNC_WRITES = [
42 Memcached::OPT_NO_BLOCK => false, // async I/O (using TCP buffers)
43 Memcached::OPT_BUFFER_WRITES => false // libmemcached buffers
44 ];
45 /** @var array Memcached options */
46 private static $OPTS_ASYNC_WRITES = [
47 Memcached::OPT_NO_BLOCK => true, // async I/O (using TCP buffers)
48 Memcached::OPT_BUFFER_WRITES => true // libmemcached buffers
49 ];
50
51 /**
52 * Available parameters are:
53 * - servers: List of IP:port combinations holding the memcached servers.
54 * - persistent: Whether to use a persistent connection
55 * - compress_threshold: The minimum size an object must be before it is compressed
56 * - timeout: The read timeout in microseconds
57 * - connect_timeout: The connect timeout in seconds
58 * - retry_timeout: Time in seconds to wait before retrying a failed connect attempt
59 * - server_failure_limit: Limit for server connect failures before it is removed
60 * - serializer: Either "php" or "igbinary". Igbinary produces more compact
61 * values, but serialization is much slower unless the php.ini
62 * option igbinary.compact_strings is off.
63 * - use_binary_protocol Whether to enable the binary protocol (default is ASCII)
64 * - allow_tcp_nagle_delay Whether to permit Nagle's algorithm for reducing packet count
65 * @param array $params
66 */
67 function __construct( $params ) {
68 parent::__construct( $params );
69
70 // Default class-specific parameters
71 $params += [
72 'compress_threshold' => 1500,
73 'connect_timeout' => 0.5,
74 'serializer' => 'php',
75 'use_binary_protocol' => false,
76 'allow_tcp_nagle_delay' => true
77 ];
78
79 if ( $params['persistent'] ) {
80 // The pool ID must be unique to the server/option combination.
81 // The Memcached object is essentially shared for each pool ID.
82 // We can only reuse a pool ID if we keep the config consistent.
83 $connectionPoolId = md5( serialize( $params ) );
84 $syncClient = new Memcached( "$connectionPoolId-sync" );
85 // Avoid clobbering the main thread-shared Memcached instance
86 $asyncClient = new Memcached( "$connectionPoolId-async" );
87 } else {
88 $syncClient = new Memcached();
89 $asyncClient = null;
90 }
91
92 $this->initializeClient( $syncClient, $params, self::$OPTS_SYNC_WRITES );
93 if ( $asyncClient ) {
94 $this->initializeClient( $asyncClient, $params, self::$OPTS_ASYNC_WRITES );
95 }
96
97 // Set the main client and any dedicated one for buffered writes
98 $this->syncClient = $syncClient;
99 $this->asyncClient = $asyncClient;
100 // The compression threshold is an undocumented php.ini option for some
101 // reason. There's probably not much harm in setting it globally, for
102 // compatibility with the settings for the PHP client.
103 ini_set( 'memcached.compression_threshold', $params['compress_threshold'] );
104 }
105
106 /**
107 * Initialize the client only if needed and reuse it otherwise.
108 * This avoids duplicate servers in the list and new connections.
109 *
110 * @param Memcached $client
111 * @param array $params
112 * @param array $options Base options for Memcached::setOptions()
113 * @throws RuntimeException
114 */
115 private function initializeClient( Memcached $client, array $params, array $options ) {
116 if ( $client->getServerList() ) {
117 $this->logger->debug( __METHOD__ . ": pre-initialized client instance." );
118
119 return; // preserve persistent handle
120 }
121
122 $this->logger->debug( __METHOD__ . ": initializing new client instance." );
123
124 $options += [
125 Memcached::OPT_NO_BLOCK => false,
126 Memcached::OPT_BUFFER_WRITES => false,
127 // Network protocol (ASCII or binary)
128 Memcached::OPT_BINARY_PROTOCOL => $params['use_binary_protocol'],
129 // Set various network timeouts
130 Memcached::OPT_CONNECT_TIMEOUT => $params['connect_timeout'] * 1000,
131 Memcached::OPT_SEND_TIMEOUT => $params['timeout'],
132 Memcached::OPT_RECV_TIMEOUT => $params['timeout'],
133 Memcached::OPT_POLL_TIMEOUT => $params['timeout'] / 1000,
134 // Avoid pointless delay when sending/fetching large blobs
135 Memcached::OPT_TCP_NODELAY => !$params['allow_tcp_nagle_delay'],
136 // Set libketama mode since it's recommended by the documentation
137 Memcached::OPT_LIBKETAMA_COMPATIBLE => true
138 ];
139 if ( isset( $params['retry_timeout'] ) ) {
140 $options[Memcached::OPT_RETRY_TIMEOUT] = $params['retry_timeout'];
141 }
142 if ( isset( $params['server_failure_limit'] ) ) {
143 $options[Memcached::OPT_SERVER_FAILURE_LIMIT] = $params['server_failure_limit'];
144 }
145 if ( $params['serializer'] === 'php' ) {
146 $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_PHP;
147 } elseif ( $params['serializer'] === 'igbinary' ) {
148 if ( !Memcached::HAVE_IGBINARY ) {
149 throw new RuntimeException(
150 __CLASS__ . ': the igbinary extension is not available ' .
151 'but igbinary serialization was requested.'
152 );
153 }
154 $options[Memcached::OPT_SERIALIZER] = Memcached::SERIALIZER_IGBINARY;
155 }
156
157 if ( !$client->setOptions( $options ) ) {
158 throw new RuntimeException(
159 "Invalid options: " . json_encode( $options, JSON_PRETTY_PRINT )
160 );
161 }
162
163 $servers = [];
164 foreach ( $params['servers'] as $host ) {
165 if ( preg_match( '/^\[(.+)\]:(\d+)$/', $host, $m ) ) {
166 $servers[] = [ $m[1], (int)$m[2] ]; // (ip, port)
167 } elseif ( preg_match( '/^([^:]+):(\d+)$/', $host, $m ) ) {
168 $servers[] = [ $m[1], (int)$m[2] ]; // (ip or path, port)
169 } else {
170 $servers[] = [ $host, false ]; // (ip or path, port)
171 }
172 }
173
174 if ( !$client->addServers( $servers ) ) {
175 throw new RuntimeException( "Failed to inject server address list" );
176 }
177 }
178
179 protected function doGet( $key, $flags = 0, &$casToken = null ) {
180 $this->debug( "get($key)" );
181
182 $client = $this->acquireSyncClient();
183 if ( defined( Memcached::class . '::GET_EXTENDED' ) ) { // v3.0.0
184 /** @noinspection PhpUndefinedClassConstantInspection */
185 $flags = Memcached::GET_EXTENDED;
186 $res = $client->get( $this->validateKeyEncoding( $key ), null, $flags );
187 if ( is_array( $res ) ) {
188 $result = $res['value'];
189 $casToken = $res['cas'];
190 } else {
191 $result = false;
192 $casToken = null;
193 }
194 } else {
195 $result = $client->get( $this->validateKeyEncoding( $key ), null, $casToken );
196 }
197
198 return $this->checkResult( $key, $result );
199 }
200
201 protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
202 $this->debug( "set($key)" );
203
204 $client = $this->acquireSyncClient();
205 $result = $client->set(
206 $this->validateKeyEncoding( $key ),
207 $value,
208 $this->fixExpiry( $exptime )
209 );
210
211 return ( $result === false && $client->getResultCode() === Memcached::RES_NOTSTORED )
212 // "Not stored" is always used as the mcrouter response with AllAsyncRoute
213 ? true
214 : $this->checkResult( $key, $result );
215 }
216
217 protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
218 $this->debug( "cas($key)" );
219
220 $result = $this->acquireSyncClient()->cas(
221 $casToken,
222 $this->validateKeyEncoding( $key ),
223 $value, $this->fixExpiry( $exptime )
224 );
225
226 return $this->checkResult( $key, $result );
227 }
228
229 protected function doDelete( $key, $flags = 0 ) {
230 $this->debug( "delete($key)" );
231
232 $client = $this->acquireSyncClient();
233 $result = $client->delete( $this->validateKeyEncoding( $key ) );
234
235 return ( $result === false && $client->getResultCode() === Memcached::RES_NOTFOUND )
236 // "Not found" is counted as success in our interface
237 ? true
238 : $this->checkResult( $key, $result );
239 }
240
241 public function add( $key, $value, $exptime = 0, $flags = 0 ) {
242 $this->debug( "add($key)" );
243
244 $result = $this->acquireSyncClient()->add(
245 $this->validateKeyEncoding( $key ),
246 $value,
247 $this->fixExpiry( $exptime )
248 );
249
250 return $this->checkResult( $key, $result );
251 }
252
253 public function incr( $key, $value = 1 ) {
254 $this->debug( "incr($key)" );
255
256 $result = $this->acquireSyncClient()->increment( $key, $value );
257
258 return $this->checkResult( $key, $result );
259 }
260
261 public function decr( $key, $value = 1 ) {
262 $this->debug( "decr($key)" );
263
264 $result = $this->acquireSyncClient()->decrement( $key, $value );
265
266 return $this->checkResult( $key, $result );
267 }
268
269 /**
270 * Check the return value from a client method call and take any necessary
271 * action. Returns the value that the wrapper function should return. At
272 * present, the return value is always the same as the return value from
273 * the client, but some day we might find a case where it should be
274 * different.
275 *
276 * @param string $key The key used by the caller, or false if there wasn't one.
277 * @param mixed $result The return value
278 * @return mixed
279 */
280 protected function checkResult( $key, $result ) {
281 if ( $result !== false ) {
282 return $result;
283 }
284
285 $client = $this->syncClient;
286 switch ( $client->getResultCode() ) {
287 case Memcached::RES_SUCCESS:
288 break;
289 case Memcached::RES_DATA_EXISTS:
290 case Memcached::RES_NOTSTORED:
291 case Memcached::RES_NOTFOUND:
292 $this->debug( "result: " . $client->getResultMessage() );
293 break;
294 default:
295 $msg = $client->getResultMessage();
296 $logCtx = [];
297 if ( $key !== false ) {
298 $server = $client->getServerByKey( $key );
299 $logCtx['memcached-server'] = "{$server['host']}:{$server['port']}";
300 $logCtx['memcached-key'] = $key;
301 $msg = "Memcached error for key \"{memcached-key}\" " .
302 "on server \"{memcached-server}\": $msg";
303 } else {
304 $msg = "Memcached error: $msg";
305 }
306 $this->logger->error( $msg, $logCtx );
307 $this->setLastError( BagOStuff::ERR_UNEXPECTED );
308 }
309 return $result;
310 }
311
312 protected function doGetMulti( array $keys, $flags = 0 ) {
313 $this->debug( 'getMulti(' . implode( ', ', $keys ) . ')' );
314
315 foreach ( $keys as $key ) {
316 $this->validateKeyEncoding( $key );
317 }
318
319 // The PECL implementation uses "gets" which works as well as a pipeline
320 $result = $this->acquireSyncClient()->getMulti( $keys ) ?: [];
321
322 return $this->checkResult( false, $result );
323 }
324
325 protected function doSetMulti( array $data, $exptime = 0, $flags = 0 ) {
326 $this->debug( 'setMulti(' . implode( ', ', array_keys( $data ) ) . ')' );
327
328 $exptime = $this->fixExpiry( $exptime );
329 foreach ( array_keys( $data ) as $key ) {
330 $this->validateKeyEncoding( $key );
331 }
332
333 // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
334 // https://github.com/php-memcached-dev/php-memcached/blob/master/php_memcached.c#L1852
335 if ( ( $flags & self::WRITE_BACKGROUND ) == self::WRITE_BACKGROUND ) {
336 $client = $this->acquireAsyncClient();
337 $result = $client->setMulti( $data, $exptime );
338 $this->releaseAsyncClient( $client );
339 } else {
340 $result = $this->acquireSyncClient()->setMulti( $data, $exptime );
341 }
342
343 return $this->checkResult( false, $result );
344 }
345
346 protected function doDeleteMulti( array $keys, $flags = 0 ) {
347 $this->debug( 'deleteMulti(' . implode( ', ', $keys ) . ')' );
348
349 foreach ( $keys as $key ) {
350 $this->validateKeyEncoding( $key );
351 }
352
353 // The PECL implementation is a naïve for-loop so use async I/O to pipeline;
354 // https://github.com/php-memcached-dev/php-memcached/blob/7443d16d02fb73cdba2e90ae282446f80969229c/php_memcached.c#L1852
355 if ( ( $flags & self::WRITE_BACKGROUND ) == self::WRITE_BACKGROUND ) {
356 $client = $this->acquireAsyncClient();
357 $resultArray = $client->deleteMulti( $keys ) ?: [];
358 $this->releaseAsyncClient( $client );
359 } else {
360 $resultArray = $this->acquireSyncClient()->deleteMulti( $keys ) ?: [];
361 }
362
363 $result = true;
364 foreach ( $resultArray as $code ) {
365 if ( !in_array( $code, [ true, Memcached::RES_NOTFOUND ], true ) ) {
366 // "Not found" is counted as success in our interface
367 $result = false;
368 }
369 }
370
371 return $this->checkResult( false, $result );
372 }
373
374 protected function doChangeTTL( $key, $exptime, $flags ) {
375 $this->debug( "touch($key)" );
376
377 $result = $this->acquireSyncClient()->touch( $key, $this->fixExpiry( $exptime ) );
378
379 return $this->checkResult( $key, $result );
380 }
381
382 protected function serialize( $value ) {
383 if ( is_int( $value ) ) {
384 return $value;
385 }
386
387 $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
388 if ( $serializer === Memcached::SERIALIZER_PHP ) {
389 return serialize( $value );
390 } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
391 return igbinary_serialize( $value );
392 }
393
394 throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
395 }
396
397 protected function unserialize( $value ) {
398 if ( $this->isInteger( $value ) ) {
399 return (int)$value;
400 }
401
402 $serializer = $this->syncClient->getOption( Memcached::OPT_SERIALIZER );
403 if ( $serializer === Memcached::SERIALIZER_PHP ) {
404 return unserialize( $value );
405 } elseif ( $serializer === Memcached::SERIALIZER_IGBINARY ) {
406 return igbinary_unserialize( $value );
407 }
408
409 throw new UnexpectedValueException( __METHOD__ . ": got serializer '$serializer'." );
410 }
411
412 /**
413 * @return Memcached
414 */
415 private function acquireSyncClient() {
416 if ( $this->syncClientIsBuffering ) {
417 throw new RuntimeException( "The main (unbuffered I/O) client is locked" );
418 }
419
420 if ( $this->hasUnflushedChanges ) {
421 // Force a synchronous flush of async writes so that their changes are visible
422 $this->syncClient->fetch();
423 if ( $this->asyncClient ) {
424 $this->asyncClient->fetch();
425 }
426 $this->hasUnflushedChanges = false;
427 }
428
429 return $this->syncClient;
430 }
431
432 /**
433 * @return Memcached
434 */
435 private function acquireAsyncClient() {
436 if ( $this->asyncClient ) {
437 return $this->asyncClient; // dedicated buffering instance
438 }
439
440 // Modify the main instance to temporarily buffer writes
441 $this->syncClientIsBuffering = true;
442 $this->syncClient->setOptions( self::$OPTS_ASYNC_WRITES );
443
444 return $this->syncClient;
445 }
446
447 /**
448 * @param Memcached $client
449 */
450 private function releaseAsyncClient( $client ) {
451 $this->hasUnflushedChanges = true;
452
453 if ( !$this->asyncClient ) {
454 // This is the main instance; make it stop buffering writes again
455 $client->setOptions( self::$OPTS_SYNC_WRITES );
456 $this->syncClientIsBuffering = false;
457 }
458 }
459 }