X-Git-Url: http://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fmemcached-client.php;h=697509e869ca679f4d7b344312be4fcb2aceca2f;hb=af7cabcf2f24ca9a0d737559bd256a3fc742eabd;hp=363c610ab724bb81972ddc5ae31d4cd72c0b45d0;hpb=5d2e9f5fb60c63ae9115f398b2af8b1e831c36ba;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/memcached-client.php b/includes/memcached-client.php index 363c610ab7..697509e869 100644 --- a/includes/memcached-client.php +++ b/includes/memcached-client.php @@ -43,9 +43,9 @@ * Usage example: * * require_once 'memcached.php'; - * + * * $mc = new memcached(array( - * 'servers' => array('127.0.0.1:10000', + * 'servers' => array('127.0.0.1:10000', * array('192.0.0.1:10010', 2), * '127.0.0.1:10020'), * 'debug' => false, @@ -105,7 +105,7 @@ class memcached * @access public */ var $stats; - + // }}} // {{{ private @@ -116,7 +116,7 @@ class memcached * @access private */ var $_cache_sock; - + /** * Current debug status; 0 - none to 9 - profiling * @@ -124,7 +124,7 @@ class memcached * @access private */ var $_debug; - + /** * Dead hosts, assoc array, 'host'=>'unixtime when ok to check again' * @@ -132,7 +132,7 @@ class memcached * @access private */ var $_host_dead; - + /** * Is compression available? * @@ -140,7 +140,7 @@ class memcached * @access private */ var $_have_zlib; - + /** * Do we want to use compression? * @@ -148,7 +148,7 @@ class memcached * @access private */ var $_compress_enable; - + /** * At how many bytes should we compress? * @@ -156,7 +156,7 @@ class memcached * @access private */ var $_compress_threshold; - + /** * Are we using persistant links? * @@ -164,7 +164,7 @@ class memcached * @access private */ var $_persistant; - + /** * If only using one server; contains ip:port to connect to * @@ -172,7 +172,7 @@ class memcached * @access private */ var $_single_sock; - + /** * Array containing ip:port or array(ip:port, weight) * @@ -180,7 +180,7 @@ class memcached * @access private */ var $_servers; - + /** * Our bit buckets * @@ -188,7 +188,7 @@ class memcached * @access private */ var $_buckets; - + /** * Total # of bit buckets we have * @@ -196,7 +196,7 @@ class memcached * @access private */ var $_bucketcount; - + /** * # of total servers we have * @@ -205,6 +205,22 @@ class memcached */ var $_active; + /** + * Stream timeout in seconds. Applies for example to fread() + * + * @var integer + * @access private + */ + var $_timeout_seconds; + + /** + * Stream timeout in microseconds + * + * @var integer + * @access private + */ + var $_timeout_microseconds; + // }}} // }}} // {{{ methods @@ -221,23 +237,26 @@ class memcached */ function memcached ($args) { - $this->set_servers($args['servers']); - $this->_debug = $args['debug']; + $this->set_servers(@$args['servers']); + $this->_debug = @$args['debug']; $this->stats = array(); - $this->_compress_threshold = $args['compress_threshold']; - $this->_persistant = isset($args['persistant']) ? $args['persistant'] : false; + $this->_compress_threshold = @$args['compress_threshold']; + $this->_persistant = array_key_exists('persistant', $args) ? (@$args['persistant']) : false; $this->_compress_enable = true; $this->_have_zlib = function_exists("gzcompress"); - + $this->_cache_sock = array(); $this->_host_dead = array(); + + $this->_timeout_seconds = 1; + $this->_timeout_microseconds = 0; } // }}} // {{{ add() /** - * Adds a key/value to the memcache server if one isn't already set with + * Adds a key/value to the memcache server if one isn't already set with * that key * * @param string $key Key to set with data @@ -285,25 +304,25 @@ class memcached { if (!$this->_active) return false; - + $sock = $this->get_sock($key); if (!is_resource($sock)) return false; - + $key = is_array($key) ? $key[1] : $key; - - $this->stats['delete']++; + + @$this->stats['delete']++; $cmd = "delete $key $time\r\n"; - if(!fwrite($sock, $cmd, strlen($cmd))) + if(!$this->_safe_fwrite($sock, $cmd, strlen($cmd))) { $this->_dead_sock($sock); return false; } $res = trim(fgets($sock)); - + if ($this->_debug) - printf("MemCache: delete %s (%s)\n", $key, $res); - + $this->_debugprint(sprintf("MemCache: delete %s (%s)\n", $key, $res)); + if ($res == "DELETED") return true; return false; @@ -366,31 +385,40 @@ class memcached */ function get ($key) { - if (!$this->_active) + $fname = 'memcached::get'; + wfProfileIn( $fname ); + + if (!$this->_active) { + wfProfileOut( $fname ); return false; - + } + $sock = $this->get_sock($key); - - if (!is_resource($sock)) + + if (!is_resource($sock)) { + wfProfileOut( $fname ); return false; - - $this->stats['get']++; - + } + + @$this->stats['get']++; + $cmd = "get $key\r\n"; - if (!fwrite($sock, $cmd, strlen($cmd))) + if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd))) { $this->_dead_sock($sock); + wfProfileOut( $fname ); return false; } - + $val = array(); $this->_load_items($sock, $val); - + if ($this->_debug) foreach ($val as $k => $v) - printf("MemCache: sock %s got %s => %s\r\n", $sock, $k, $v); - - return $val[$key]; + $this->_debugprint(@sprintf("MemCache: sock %s got %s => %s\r\n", serialize($sock), $k, $v)); + + wfProfileOut( $fname ); + return @$val[$key]; } // }}} @@ -408,9 +436,9 @@ class memcached { if (!$this->_active) return false; - + $this->stats['get_multi']++; - + foreach ($keys as $key) { $sock = $this->get_sock($key); @@ -423,7 +451,7 @@ class memcached } $sock_keys[$sock][] = $key; } - + // Send out the requests foreach ($socks as $sock) { @@ -433,8 +461,8 @@ class memcached $cmd .= " ". $key; } $cmd .= "\r\n"; - - if (fwrite($sock, $cmd, strlen($cmd))) + + if ($this->_safe_fwrite($sock, $cmd, strlen($cmd))) { $gather[] = $sock; } else @@ -442,18 +470,18 @@ class memcached $this->_dead_sock($sock); } } - + // Parse responses $val = array(); foreach ($gather as $sock) { $this->_load_items($sock, $val); } - + if ($this->_debug) foreach ($val as $k => $v) - printf("MemCache: got %s => %s\r\n", $k, $v); - + $this->_debugprint(sprintf("MemCache: got %s => %s\r\n", $k, $v)); + return $val; } @@ -496,7 +524,7 @@ class memcached // {{{ run_command() /** - * Passes through $cmd to the memcache server connected by $sock; returns + * Passes through $cmd to the memcache server connected by $sock; returns * output as an array (null array if no output) * * NOTE: due to a possible bug in how PHP reads while using fgets(), each @@ -515,10 +543,10 @@ class memcached { if (!is_resource($sock)) return array(); - - if (!fwrite($sock, $cmd, strlen($cmd))) + + if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd))) return array(); - + while (true) { $res = fgets($sock); @@ -600,12 +628,26 @@ class memcached $this->_active = count($list); $this->_buckets = null; $this->_bucketcount = 0; - + $this->_single_sock = null; if ($this->_active == 1) $this->_single_sock = $this->_servers[0]; } + /** + * Sets the timeout for new connections + * + * @param integer $seconds Number of seconds + * @param integer $microseconds Number of microseconds + * + * @access public + */ + function set_timeout ($seconds, $microseconds) + { + $this->_timeout_seconds = $seconds; + $this->_timeout_microseconds = $microseconds; + } + // }}} // }}} // {{{ private methods @@ -648,9 +690,16 @@ class memcached { $sock = @fsockopen($ip, $port, $errno, $errstr, $timeout); } - - if (!$sock) + + if (!$sock) { + if ($this->_debug) + $this->_debugprint( "Error connecting to $host: $errstr\n" ); return false; + } + + // Initialise timeout + stream_set_timeout($sock, $this->_timeout_seconds, $this->_timeout_microseconds); + return true; } @@ -667,7 +716,7 @@ class memcached function _dead_sock ($sock) { $host = array_search($sock, $this->_cache_sock); - list ($ip, $port) = explode(":", $host); + @list ($ip, $port) = explode(":", $host); $this->_host_dead[$ip] = time() + 30 + intval(rand(0, 10)); $this->_host_dead[$host] = $this->_host_dead[$ip]; unset($this->_cache_sock[$host]); @@ -689,11 +738,13 @@ class memcached if (!$this->_active) return false; - if ($this->_single_sock !== null) + if ($this->_single_sock !== null) { + $this->_flush_read_buffer($this->_single_sock); return $this->sock_to_host($this->_single_sock); - + } + $hv = is_array($key) ? intval($key[0]) : $this->_hashfunc($key); - + if ($this->_buckets === null) { foreach ($this->_servers as $v) @@ -710,17 +761,19 @@ class memcached $this->_buckets = $bu; $this->_bucketcount = count($bu); } - + $realkey = is_array($key) ? $key[1] : $key; for ($tries = 0; $tries<20; $tries++) { $host = $this->_buckets[$hv % $this->_bucketcount]; $sock = $this->sock_to_host($host); - if (is_resource($sock)) + if (is_resource($sock)) { + $this->_flush_read_buffer($sock); return $sock; + } $hv += $this->_hashfunc($tries . $realkey); } - + return false; } @@ -737,13 +790,10 @@ class memcached */ function _hashfunc ($key) { - $hash = 0; - for ($i=0; $i_active) return null; - + $sock = $this->get_sock($key); if (!is_resource($sock)) return null; - + $key = is_array($key) ? $key[1] : $key; - $this->stats[$cmd]++; - if (!fwrite($sock, "$cmd $key $amt\r\n")) + @$this->stats[$cmd]++; + if (!$this->_safe_fwrite($sock, "$cmd $key $amt\r\n")) return $this->_dead_sock($sock); - + stream_set_timeout($sock, 1, 0); $line = fgets($sock); if (!preg_match('/^(\d+)/', $line, $match)) @@ -804,7 +854,7 @@ class memcached list($rkey, $flags, $len) = array($match[1], $match[2], $match[3]); $bneed = $len+2; $offset = 0; - + while ($bneed > 0) { $data = fread($sock, $bneed); @@ -813,31 +863,31 @@ class memcached break; $offset += $n; $bneed -= $n; - $ret[$rkey] .= $data; + @$ret[$rkey] .= $data; } - + if ($offset != $len+2) { // Something is borked! if ($this->_debug) - printf("Something is borked! key %s expecting %d got %d length\n", $rkey, $len+2, $offset); + $this->_debugprint(sprintf("Something is borked! key %s expecting %d got %d length\n", $rkey, $len+2, $offset)); unset($ret[$rkey]); $this->_close_sock($sock); return false; } - - $ret[$rkey] = rtrim($ret[$rkey]); if ($this->_have_zlib && $flags & MEMCACHE_COMPRESSED) $ret[$rkey] = gzuncompress($ret[$rkey]); + $ret[$rkey] = rtrim($ret[$rkey]); + if ($flags & MEMCACHE_SERIALIZED) $ret[$rkey] = unserialize($ret[$rkey]); - } else + } else { - print("Error parsing memcached response\n"); + $this->_debugprint("Error parsing memcached response\n"); return 0; } } @@ -861,50 +911,50 @@ class memcached { if (!$this->_active) return false; - + $sock = $this->get_sock($key); if (!is_resource($sock)) return false; - - $this->stats[$cmd]++; - + + @$this->stats[$cmd]++; + $flags = 0; - + if (!is_scalar($val)) { $val = serialize($val); $flags |= MEMCACHE_SERIALIZED; if ($this->_debug) - printf("client: serializing data as it is not scalar\n"); + $this->_debugprint(sprintf("client: serializing data as it is not scalar\n")); } - + $len = strlen($val); - - if ($this->_have_zlib && $this->_compress_enable && + + if ($this->_have_zlib && $this->_compress_enable && $this->_compress_threshold && $len >= $this->_compress_threshold) { $c_val = gzcompress($val, 9); $c_len = strlen($c_val); - - if ($c_len < $len*(1 - COMPRESS_SAVINGS)) + + if ($c_len < $len*(1 - COMPRESSION_SAVINGS)) { if ($this->_debug) - printf("client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len); + $this->_debugprint(sprintf("client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len)); $val = $c_val; $len = $c_len; $flags |= MEMCACHE_COMPRESSED; } } - if (!fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n")) + if (!$this->_safe_fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n")) return $this->_dead_sock($sock); - + $line = trim(fgets($sock)); - + if ($this->_debug) { if ($flags & MEMCACHE_COMPRESSED) $val = 'compressed data'; - printf("MemCache: %s %s => %s (%s)\n", $cmd, $key, $val, $line); + $this->_debugprint(sprintf("MemCache: %s %s => %s (%s)\n", $cmd, $key, $val, $line)); } if ($line == "STORED") return true; @@ -926,28 +976,85 @@ class memcached { if (isset($this->_cache_sock[$host])) return $this->_cache_sock[$host]; - + $now = time(); list ($ip, $port) = explode (":", $host); if (isset($this->_host_dead[$host]) && $this->_host_dead[$host] > $now || isset($this->_host_dead[$ip]) && $this->_host_dead[$ip] > $now) return null; - + if (!$this->_connect_sock($sock, $host)) return $this->_dead_sock($host); - + // Do not buffer writes stream_set_write_buffer($sock, 0); - + $this->_cache_sock[$host] = $sock; - + return $this->_cache_sock[$host]; } + function _debugprint($str){ + print($str); + } + + /** + * Write to a stream, timing out after the correct amount of time + * + * @return bool false on failure, true on success + */ + /* + function _safe_fwrite($f, $buf, $len = false) { + stream_set_blocking($f, 0); + + if ($len === false) { + wfDebug("Writing " . strlen( $buf ) . " bytes\n"); + $bytesWritten = fwrite($f, $buf); + } else { + wfDebug("Writing $len bytes\n"); + $bytesWritten = fwrite($f, $buf, $len); + } + $n = stream_select($r=NULL, $w = array($f), $e = NULL, 10, 0); + # $this->_timeout_seconds, $this->_timeout_microseconds); + + wfDebug("stream_select returned $n\n"); + stream_set_blocking($f, 1); + return $n == 1; + return $bytesWritten; + }*/ + + /** + * Original behaviour + */ + function _safe_fwrite($f, $buf, $len = false) { + if ($len === false) { + $bytesWritten = fwrite($f, $buf); + } else { + $bytesWritten = fwrite($f, $buf, $len); + } + return $bytesWritten; + } + + /** + * Flush the read buffer of a stream + */ + function _flush_read_buffer($f) { + if (!is_resource($f)) { + return; + } + $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0); + while ($n == 1 && !feof($f)) { + fread($f, 1024); + $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0); + } + } + // }}} // }}} // }}} } +// vim: sts=3 sw=3 et + // }}} ?>