X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fmemcached-client.php;h=2eddb908f83b647d8fb6d6d522d2ff71bef14431;hb=72f3df40d2ed32e8d7af94b958210aa57b1152d1;hp=934ca7680fc2f35997fb4fd9e14eaaf14f32dee2;hpb=5a363c1dd81a2c535be1d7d60108b38f4b6b2aa7;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/memcached-client.php b/includes/memcached-client.php index 934ca7680f..2eddb908f8 100644 --- a/includes/memcached-client.php +++ b/includes/memcached-client.php @@ -43,9 +43,9 @@ * Usage example: * * require_once 'memcached.php'; - * + * * $mc = new memcached(array( - * 'servers' => array('127.0.0.1:10000', + * 'servers' => array('127.0.0.1:10000', * array('192.0.0.1:10010', 2), * '127.0.0.1:10020'), * 'debug' => false, @@ -91,7 +91,7 @@ define("COMPRESSION_SAVINGS", 0.20); * memcached client class implemented using (p)fsockopen() * * @author Ryan T. Dean - * @package memcached-client + * @addtogroup Cache */ class memcached { @@ -105,7 +105,7 @@ class memcached * @access public */ var $stats; - + // }}} // {{{ private @@ -116,7 +116,7 @@ class memcached * @access private */ var $_cache_sock; - + /** * Current debug status; 0 - none to 9 - profiling * @@ -124,7 +124,7 @@ class memcached * @access private */ var $_debug; - + /** * Dead hosts, assoc array, 'host'=>'unixtime when ok to check again' * @@ -132,7 +132,7 @@ class memcached * @access private */ var $_host_dead; - + /** * Is compression available? * @@ -140,7 +140,7 @@ class memcached * @access private */ var $_have_zlib; - + /** * Do we want to use compression? * @@ -148,15 +148,15 @@ class memcached * @access private */ var $_compress_enable; - + /** * At how many bytes should we compress? * - * @var interger + * @var integer * @access private */ var $_compress_threshold; - + /** * Are we using persistant links? * @@ -164,7 +164,7 @@ class memcached * @access private */ var $_persistant; - + /** * If only using one server; contains ip:port to connect to * @@ -172,7 +172,7 @@ class memcached * @access private */ var $_single_sock; - + /** * Array containing ip:port or array(ip:port, weight) * @@ -180,7 +180,7 @@ class memcached * @access private */ var $_servers; - + /** * Our bit buckets * @@ -188,23 +188,49 @@ class memcached * @access private */ var $_buckets; - + /** * Total # of bit buckets we have * - * @var interger + * @var integer * @access private */ var $_bucketcount; - + /** * # of total servers we have * - * @var interger + * @var integer * @access private */ var $_active; + /** + * Stream timeout in seconds. Applies for example to fread() + * + * @var integer + * @access private + */ + var $_timeout_seconds; + + /** + * Stream timeout in microseconds + * + * @var integer + * @access private + */ + var $_timeout_microseconds; + + /** + * Connect timeout in seconds + */ + var $_connect_timeout; + + /** + * Number of connection attempts for each server + */ + var $_connect_attempts; + // }}} // }}} // {{{ methods @@ -228,21 +254,27 @@ class memcached $this->_persistant = array_key_exists('persistant', $args) ? (@$args['persistant']) : false; $this->_compress_enable = true; $this->_have_zlib = function_exists("gzcompress"); - + $this->_cache_sock = array(); $this->_host_dead = array(); + + $this->_timeout_seconds = 1; + $this->_timeout_microseconds = 0; + + $this->_connect_timeout = 0.01; + $this->_connect_attempts = 3; } // }}} // {{{ add() /** - * Adds a key/value to the memcache server if one isn't already set with + * Adds a key/value to the memcache server if one isn't already set with * that key * - * @param string $key Key to set with data - * @param mixed $val Value to store - * @param interger $exp (optional) Time to expire data at + * @param string $key Key to set with data + * @param mixed $val Value to store + * @param integer $exp (optional) Time to expire data at * * @return boolean * @access public @@ -259,7 +291,7 @@ class memcached * Decriment a value stored on the memcache server * * @param string $key Key to decriment - * @param interger $amt (optional) Amount to decriment + * @param integer $amt (optional) Amount to decriment * * @return mixed FALSE on failure, value on success * @access public @@ -276,7 +308,7 @@ class memcached * Deletes a key from the server, optionally after $time * * @param string $key Key to delete - * @param interger $time (optional) How long to wait before deleting + * @param integer $time (optional) How long to wait before deleting * * @return boolean TRUE on success, FALSE on failure * @access public @@ -285,25 +317,25 @@ class memcached { if (!$this->_active) return false; - + $sock = $this->get_sock($key); if (!is_resource($sock)) return false; - + $key = is_array($key) ? $key[1] : $key; - - $this->stats['delete']++; + + @$this->stats['delete']++; $cmd = "delete $key $time\r\n"; - if(!fwrite($sock, $cmd, strlen($cmd))) + if(!$this->_safe_fwrite($sock, $cmd, strlen($cmd))) { $this->_dead_sock($sock); return false; } $res = trim(fgets($sock)); - + if ($this->_debug) $this->_debugprint(sprintf("MemCache: delete %s (%s)\n", $key, $res)); - + if ($res == "DELETED") return true; return false; @@ -366,30 +398,39 @@ class memcached */ function get ($key) { - if (!$this->_active) + $fname = 'memcached::get'; + wfProfileIn( $fname ); + + if (!$this->_active) { + wfProfileOut( $fname ); return false; - + } + $sock = $this->get_sock($key); - - if (!is_resource($sock)) + + if (!is_resource($sock)) { + wfProfileOut( $fname ); return false; - + } + @$this->stats['get']++; - + $cmd = "get $key\r\n"; - if (!fwrite($sock, $cmd, strlen($cmd))) + if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd))) { $this->_dead_sock($sock); + wfProfileOut( $fname ); return false; } - + $val = array(); $this->_load_items($sock, $val); - + if ($this->_debug) foreach ($val as $k => $v) $this->_debugprint(@sprintf("MemCache: sock %s got %s => %s\r\n", serialize($sock), $k, $v)); + wfProfileOut( $fname ); return @$val[$key]; } @@ -408,8 +449,9 @@ class memcached { if (!$this->_active) return false; - + $this->stats['get_multi']++; + $sock_keys = array(); foreach ($keys as $key) { @@ -423,7 +465,7 @@ class memcached } $sock_keys[$sock][] = $key; } - + // Send out the requests foreach ($socks as $sock) { @@ -433,8 +475,8 @@ class memcached $cmd .= " ". $key; } $cmd .= "\r\n"; - - if (fwrite($sock, $cmd, strlen($cmd))) + + if ($this->_safe_fwrite($sock, $cmd, strlen($cmd))) { $gather[] = $sock; } else @@ -442,18 +484,18 @@ class memcached $this->_dead_sock($sock); } } - + // Parse responses $val = array(); foreach ($gather as $sock) { $this->_load_items($sock, $val); } - + if ($this->_debug) foreach ($val as $k => $v) $this->_debugprint(sprintf("MemCache: got %s => %s\r\n", $k, $v)); - + return $val; } @@ -464,9 +506,9 @@ class memcached * Increments $key (optionally) by $amt * * @param string $key Key to increment - * @param interger $amt (optional) amount to increment + * @param integer $amt (optional) amount to increment * - * @return interger New key value? + * @return integer New key value? * @access public */ function incr ($key, $amt=1) @@ -482,7 +524,7 @@ class memcached * * @param string $key Key to set value as * @param mixed $value Value to store - * @param interger $exp (optional) Experiation time + * @param integer $exp (optional) Experiation time * * @return boolean * @access public @@ -496,7 +538,7 @@ class memcached // {{{ run_command() /** - * Passes through $cmd to the memcache server connected by $sock; returns + * Passes through $cmd to the memcache server connected by $sock; returns * output as an array (null array if no output) * * NOTE: due to a possible bug in how PHP reads while using fgets(), each @@ -515,10 +557,10 @@ class memcached { if (!is_resource($sock)) return array(); - - if (!fwrite($sock, $cmd, strlen($cmd))) + + if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd))) return array(); - + while (true) { $res = fgets($sock); @@ -540,7 +582,7 @@ class memcached * * @param string $key Key to set value as * @param mixed $value Value to set - * @param interger $exp (optional) Experiation time + * @param integer $exp (optional) Experiation time * * @return boolean TRUE on success * @access public @@ -556,7 +598,7 @@ class memcached /** * Sets the compression threshold * - * @param interger $thresh Threshold to compress if larger than + * @param integer $thresh Threshold to compress if larger than * * @access public */ @@ -600,12 +642,26 @@ class memcached $this->_active = count($list); $this->_buckets = null; $this->_bucketcount = 0; - + $this->_single_sock = null; if ($this->_active == 1) $this->_single_sock = $this->_servers[0]; } + /** + * Sets the timeout for new connections + * + * @param integer $seconds Number of seconds + * @param integer $microseconds Number of microseconds + * + * @access public + */ + function set_timeout ($seconds, $microseconds) + { + $this->_timeout_seconds = $seconds; + $this->_timeout_microseconds = $microseconds; + } + // }}} // }}} // {{{ private methods @@ -631,26 +687,45 @@ class memcached /** * Connects $sock to $host, timing out after $timeout * - * @param interger $sock Socket to connect + * @param integer $sock Socket to connect * @param string $host Host:IP to connect to - * @param float $timeout (optional) Timeout value, defaults to 0.25s * * @return boolean * @access private */ - function _connect_sock (&$sock, $host, $timeout = 0.25) + function _connect_sock (&$sock, $host) { list ($ip, $port) = explode(":", $host); - if ($this->_persistant == 1) - { - $sock = @pfsockopen($ip, $port, $errno, $errstr, $timeout); - } else - { - $sock = @fsockopen($ip, $port, $errno, $errstr, $timeout); + $sock = false; + $timeout = $this->_connect_timeout; + $errno = $errstr = null; + for ($i = 0; !$sock && $i < $this->_connect_attempts; $i++) { + if ($i > 0) { + # Sleep until the timeout, in case it failed fast + $elapsed = microtime(true) - $t; + if ( $elapsed < $timeout ) { + usleep(($timeout - $elapsed) * 1e6); + } + $timeout *= 2; + } + $t = microtime(true); + if ($this->_persistant == 1) + { + $sock = @pfsockopen($ip, $port, $errno, $errstr, $timeout); + } else + { + $sock = @fsockopen($ip, $port, $errno, $errstr, $timeout); + } } - - if (!$sock) + if (!$sock) { + if ($this->_debug) + $this->_debugprint( "Error connecting to $host: $errstr\n" ); return false; + } + + // Initialise timeout + stream_set_timeout($sock, $this->_timeout_seconds, $this->_timeout_microseconds); + return true; } @@ -667,7 +742,7 @@ class memcached function _dead_sock ($sock) { $host = array_search($sock, $this->_cache_sock); - list ($ip, $port) = explode(":", $host); + @list ($ip, /* $port */) = explode(":", $host); $this->_host_dead[$ip] = time() + 30 + intval(rand(0, 10)); $this->_host_dead[$host] = $this->_host_dead[$ip]; unset($this->_cache_sock[$host]); @@ -689,11 +764,13 @@ class memcached if (!$this->_active) return false; - if ($this->_single_sock !== null) + if ($this->_single_sock !== null) { + $this->_flush_read_buffer($this->_single_sock); return $this->sock_to_host($this->_single_sock); - + } + $hv = is_array($key) ? intval($key[0]) : $this->_hashfunc($key); - + if ($this->_buckets === null) { foreach ($this->_servers as $v) @@ -710,17 +787,19 @@ class memcached $this->_buckets = $bu; $this->_bucketcount = count($bu); } - + $realkey = is_array($key) ? $key[1] : $key; for ($tries = 0; $tries<20; $tries++) { $host = $this->_buckets[$hv % $this->_bucketcount]; $sock = $this->sock_to_host($host); - if (is_resource($sock)) + if (is_resource($sock)) { + $this->_flush_read_buffer($sock); return $sock; + } $hv += $this->_hashfunc($tries . $realkey); } - + return false; } @@ -728,22 +807,19 @@ class memcached // {{{ _hashfunc() /** - * Creates a hash interger based on the $key + * Creates a hash integer based on the $key * * @param string $key Key to hash * - * @return interger Hash value + * @return integer Hash value * @access private */ function _hashfunc ($key) { - $hash = 0; - for ($i=0; $i_active) return null; - + $sock = $this->get_sock($key); if (!is_resource($sock)) return null; - + $key = is_array($key) ? $key[1] : $key; - $this->stats[$cmd]++; - if (!fwrite($sock, "$cmd $key $amt\r\n")) + @$this->stats[$cmd]++; + if (!$this->_safe_fwrite($sock, "$cmd $key $amt\r\n")) return $this->_dead_sock($sock); - + stream_set_timeout($sock, 1, 0); $line = fgets($sock); + $match = array(); if (!preg_match('/^(\d+)/', $line, $match)) return null; return $match[1]; @@ -804,7 +881,7 @@ class memcached list($rkey, $flags, $len) = array($match[1], $match[2], $match[3]); $bneed = $len+2; $offset = 0; - + while ($bneed > 0) { $data = fread($sock, $bneed); @@ -815,7 +892,7 @@ class memcached $bneed -= $n; @$ret[$rkey] .= $data; } - + if ($offset != $len+2) { // Something is borked! @@ -826,16 +903,16 @@ class memcached $this->_close_sock($sock); return false; } - - $ret[$rkey] = rtrim($ret[$rkey]); if ($this->_have_zlib && $flags & MEMCACHE_COMPRESSED) $ret[$rkey] = gzuncompress($ret[$rkey]); + $ret[$rkey] = rtrim($ret[$rkey]); + if ($flags & MEMCACHE_SERIALIZED) $ret[$rkey] = unserialize($ret[$rkey]); - } else + } else { $this->_debugprint("Error parsing memcached response\n"); return 0; @@ -852,7 +929,7 @@ class memcached * @param string $cmd Command to perform * @param string $key Key to act on * @param mixed $val What we need to store - * @param interger $exp When it should expire + * @param integer $exp When it should expire * * @return boolean * @access private @@ -861,15 +938,15 @@ class memcached { if (!$this->_active) return false; - + $sock = $this->get_sock($key); if (!is_resource($sock)) return false; - + @$this->stats[$cmd]++; - + $flags = 0; - + if (!is_scalar($val)) { $val = serialize($val); @@ -877,16 +954,16 @@ class memcached if ($this->_debug) $this->_debugprint(sprintf("client: serializing data as it is not scalar\n")); } - + $len = strlen($val); - - if ($this->_have_zlib && $this->_compress_enable && + + if ($this->_have_zlib && $this->_compress_enable && $this->_compress_threshold && $len >= $this->_compress_threshold) { $c_val = gzcompress($val, 9); $c_len = strlen($c_val); - - if ($c_len < $len*(1 - COMPRESS_SAVINGS)) + + if ($c_len < $len*(1 - COMPRESSION_SAVINGS)) { if ($this->_debug) $this->_debugprint(sprintf("client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len)); @@ -895,11 +972,11 @@ class memcached $flags |= MEMCACHE_COMPRESSED; } } - if (!fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n")) + if (!$this->_safe_fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n")) return $this->_dead_sock($sock); - + $line = trim(fgets($sock)); - + if ($this->_debug) { if ($flags & MEMCACHE_COMPRESSED) @@ -926,21 +1003,22 @@ class memcached { if (isset($this->_cache_sock[$host])) return $this->_cache_sock[$host]; - + + $sock = null; $now = time(); - list ($ip, $port) = explode (":", $host); + list ($ip, /* $port */) = explode (":", $host); if (isset($this->_host_dead[$host]) && $this->_host_dead[$host] > $now || isset($this->_host_dead[$ip]) && $this->_host_dead[$ip] > $now) return null; - + if (!$this->_connect_sock($sock, $host)) return $this->_dead_sock($host); - + // Do not buffer writes stream_set_write_buffer($sock, 0); - + $this->_cache_sock[$host] = $sock; - + return $this->_cache_sock[$host]; } @@ -948,10 +1026,63 @@ class memcached print($str); } + /** + * Write to a stream, timing out after the correct amount of time + * + * @return bool false on failure, true on success + */ + /* + function _safe_fwrite($f, $buf, $len = false) { + stream_set_blocking($f, 0); + + if ($len === false) { + wfDebug("Writing " . strlen( $buf ) . " bytes\n"); + $bytesWritten = fwrite($f, $buf); + } else { + wfDebug("Writing $len bytes\n"); + $bytesWritten = fwrite($f, $buf, $len); + } + $n = stream_select($r=NULL, $w = array($f), $e = NULL, 10, 0); + # $this->_timeout_seconds, $this->_timeout_microseconds); + + wfDebug("stream_select returned $n\n"); + stream_set_blocking($f, 1); + return $n == 1; + return $bytesWritten; + }*/ + + /** + * Original behaviour + */ + function _safe_fwrite($f, $buf, $len = false) { + if ($len === false) { + $bytesWritten = fwrite($f, $buf); + } else { + $bytesWritten = fwrite($f, $buf, $len); + } + return $bytesWritten; + } + + /** + * Flush the read buffer of a stream + */ + function _flush_read_buffer($f) { + if (!is_resource($f)) { + return; + } + $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0); + while ($n == 1 && !feof($f)) { + fread($f, 1024); + $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0); + } + } + // }}} // }}} // }}} } +// vim: sts=3 sw=3 et + // }}} -?> +