* Usage example:
*
* require_once 'memcached.php';
- *
+ *
* $mc = new memcached(array(
- * 'servers' => array('127.0.0.1:10000',
+ * 'servers' => array('127.0.0.1:10000',
* array('192.0.0.1:10010', 2),
* '127.0.0.1:10020'),
* 'debug' => false,
* memcached client class implemented using (p)fsockopen()
*
* @author Ryan T. Dean <rtdean@cytherianage.net>
- * @package memcached-client
+ * @addtogroup Cache
*/
class memcached
{
* @access public
*/
var $stats;
-
+
// }}}
// {{{ private
* @access private
*/
var $_cache_sock;
-
+
/**
* Current debug status; 0 - none to 9 - profiling
*
* @access private
*/
var $_debug;
-
+
/**
* Dead hosts, assoc array, 'host'=>'unixtime when ok to check again'
*
* @access private
*/
var $_host_dead;
-
+
/**
* Is compression available?
*
* @access private
*/
var $_have_zlib;
-
+
/**
* Do we want to use compression?
*
* @access private
*/
var $_compress_enable;
-
+
/**
* At how many bytes should we compress?
*
- * @var interger
+ * @var integer
* @access private
*/
var $_compress_threshold;
-
+
/**
* Are we using persistant links?
*
* @access private
*/
var $_persistant;
-
+
/**
* If only using one server; contains ip:port to connect to
*
* @access private
*/
var $_single_sock;
-
+
/**
* Array containing ip:port or array(ip:port, weight)
*
* @access private
*/
var $_servers;
-
+
/**
* Our bit buckets
*
* @access private
*/
var $_buckets;
-
+
/**
* Total # of bit buckets we have
*
- * @var interger
+ * @var integer
* @access private
*/
var $_bucketcount;
-
+
/**
* # of total servers we have
*
- * @var interger
+ * @var integer
* @access private
*/
var $_active;
+ /**
+ * Stream timeout in seconds. Applies for example to fread()
+ *
+ * @var integer
+ * @access private
+ */
+ var $_timeout_seconds;
+
+ /**
+ * Stream timeout in microseconds
+ *
+ * @var integer
+ * @access private
+ */
+ var $_timeout_microseconds;
+
+ /**
+ * Connect timeout in seconds
+ */
+ var $_connect_timeout;
+
+ /**
+ * Number of connection attempts for each server
+ */
+ var $_connect_attempts;
+
// }}}
// }}}
// {{{ methods
$this->_persistant = array_key_exists('persistant', $args) ? (@$args['persistant']) : false;
$this->_compress_enable = true;
$this->_have_zlib = function_exists("gzcompress");
-
+
$this->_cache_sock = array();
$this->_host_dead = array();
+
+ $this->_timeout_seconds = 1;
+ $this->_timeout_microseconds = 0;
+
+ $this->_connect_timeout = 0.01;
+ $this->_connect_attempts = 3;
}
// }}}
// {{{ add()
/**
- * Adds a key/value to the memcache server if one isn't already set with
+ * Adds a key/value to the memcache server if one isn't already set with
* that key
*
- * @param string $key Key to set with data
- * @param mixed $val Value to store
- * @param interger $exp (optional) Time to expire data at
+ * @param string $key Key to set with data
+ * @param mixed $val Value to store
+ * @param integer $exp (optional) Time to expire data at
*
* @return boolean
* @access public
* Decriment a value stored on the memcache server
*
* @param string $key Key to decriment
- * @param interger $amt (optional) Amount to decriment
+ * @param integer $amt (optional) Amount to decriment
*
* @return mixed FALSE on failure, value on success
* @access public
* Deletes a key from the server, optionally after $time
*
* @param string $key Key to delete
- * @param interger $time (optional) How long to wait before deleting
+ * @param integer $time (optional) How long to wait before deleting
*
* @return boolean TRUE on success, FALSE on failure
* @access public
{
if (!$this->_active)
return false;
-
+
$sock = $this->get_sock($key);
if (!is_resource($sock))
return false;
-
+
$key = is_array($key) ? $key[1] : $key;
-
+
@$this->stats['delete']++;
$cmd = "delete $key $time\r\n";
- if(!fwrite($sock, $cmd, strlen($cmd)))
+ if(!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
{
$this->_dead_sock($sock);
return false;
}
$res = trim(fgets($sock));
-
+
if ($this->_debug)
$this->_debugprint(sprintf("MemCache: delete %s (%s)\n", $key, $res));
-
+
if ($res == "DELETED")
return true;
return false;
*/
function get ($key)
{
- if (!$this->_active)
+ $fname = 'memcached::get';
+ wfProfileIn( $fname );
+
+ if (!$this->_active) {
+ wfProfileOut( $fname );
return false;
-
+ }
+
$sock = $this->get_sock($key);
-
- if (!is_resource($sock))
+
+ if (!is_resource($sock)) {
+ wfProfileOut( $fname );
return false;
-
+ }
+
@$this->stats['get']++;
-
+
$cmd = "get $key\r\n";
- if (!fwrite($sock, $cmd, strlen($cmd)))
+ if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
{
$this->_dead_sock($sock);
+ wfProfileOut( $fname );
return false;
}
-
+
$val = array();
$this->_load_items($sock, $val);
-
+
if ($this->_debug)
foreach ($val as $k => $v)
$this->_debugprint(@sprintf("MemCache: sock %s got %s => %s\r\n", serialize($sock), $k, $v));
+ wfProfileOut( $fname );
return @$val[$key];
}
{
if (!$this->_active)
return false;
-
+
$this->stats['get_multi']++;
+ $sock_keys = array();
foreach ($keys as $key)
{
}
$sock_keys[$sock][] = $key;
}
-
+
// Send out the requests
foreach ($socks as $sock)
{
$cmd .= " ". $key;
}
$cmd .= "\r\n";
-
- if (fwrite($sock, $cmd, strlen($cmd)))
+
+ if ($this->_safe_fwrite($sock, $cmd, strlen($cmd)))
{
$gather[] = $sock;
} else
$this->_dead_sock($sock);
}
}
-
+
// Parse responses
$val = array();
foreach ($gather as $sock)
{
$this->_load_items($sock, $val);
}
-
+
if ($this->_debug)
foreach ($val as $k => $v)
$this->_debugprint(sprintf("MemCache: got %s => %s\r\n", $k, $v));
-
+
return $val;
}
* Increments $key (optionally) by $amt
*
* @param string $key Key to increment
- * @param interger $amt (optional) amount to increment
+ * @param integer $amt (optional) amount to increment
*
- * @return interger New key value?
+ * @return integer New key value?
* @access public
*/
function incr ($key, $amt=1)
*
* @param string $key Key to set value as
* @param mixed $value Value to store
- * @param interger $exp (optional) Experiation time
+ * @param integer $exp (optional) Experiation time
*
* @return boolean
* @access public
// {{{ run_command()
/**
- * Passes through $cmd to the memcache server connected by $sock; returns
+ * Passes through $cmd to the memcache server connected by $sock; returns
* output as an array (null array if no output)
*
* NOTE: due to a possible bug in how PHP reads while using fgets(), each
{
if (!is_resource($sock))
return array();
-
- if (!fwrite($sock, $cmd, strlen($cmd)))
+
+ if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
return array();
-
+
while (true)
{
$res = fgets($sock);
*
* @param string $key Key to set value as
* @param mixed $value Value to set
- * @param interger $exp (optional) Experiation time
+ * @param integer $exp (optional) Experiation time
*
* @return boolean TRUE on success
* @access public
/**
* Sets the compression threshold
*
- * @param interger $thresh Threshold to compress if larger than
+ * @param integer $thresh Threshold to compress if larger than
*
* @access public
*/
$this->_active = count($list);
$this->_buckets = null;
$this->_bucketcount = 0;
-
+
$this->_single_sock = null;
if ($this->_active == 1)
$this->_single_sock = $this->_servers[0];
}
+ /**
+ * Sets the timeout for new connections
+ *
+ * @param integer $seconds Number of seconds
+ * @param integer $microseconds Number of microseconds
+ *
+ * @access public
+ */
+ function set_timeout ($seconds, $microseconds)
+ {
+ $this->_timeout_seconds = $seconds;
+ $this->_timeout_microseconds = $microseconds;
+ }
+
// }}}
// }}}
// {{{ private methods
/**
* Connects $sock to $host, timing out after $timeout
*
- * @param interger $sock Socket to connect
+ * @param integer $sock Socket to connect
* @param string $host Host:IP to connect to
- * @param float $timeout (optional) Timeout value, defaults to 0.25s
*
* @return boolean
* @access private
*/
- function _connect_sock (&$sock, $host, $timeout = 0.25)
+ function _connect_sock (&$sock, $host)
{
list ($ip, $port) = explode(":", $host);
- if ($this->_persistant == 1)
- {
- $sock = @pfsockopen($ip, $port, $errno, $errstr, $timeout);
- } else
- {
- $sock = @fsockopen($ip, $port, $errno, $errstr, $timeout);
+ $sock = false;
+ $timeout = $this->_connect_timeout;
+ $errno = $errstr = null;
+ for ($i = 0; !$sock && $i < $this->_connect_attempts; $i++) {
+ if ($i > 0) {
+ # Sleep until the timeout, in case it failed fast
+ $elapsed = microtime(true) - $t;
+ if ( $elapsed < $timeout ) {
+ usleep(($timeout - $elapsed) * 1e6);
+ }
+ $timeout *= 2;
+ }
+ $t = microtime(true);
+ if ($this->_persistant == 1)
+ {
+ $sock = @pfsockopen($ip, $port, $errno, $errstr, $timeout);
+ } else
+ {
+ $sock = @fsockopen($ip, $port, $errno, $errstr, $timeout);
+ }
}
-
- if (!$sock)
+ if (!$sock) {
+ if ($this->_debug)
+ $this->_debugprint( "Error connecting to $host: $errstr\n" );
return false;
+ }
+
+ // Initialise timeout
+ stream_set_timeout($sock, $this->_timeout_seconds, $this->_timeout_microseconds);
+
return true;
}
function _dead_sock ($sock)
{
$host = array_search($sock, $this->_cache_sock);
- list ($ip, $port) = explode(":", $host);
+ @list ($ip, /* $port */) = explode(":", $host);
$this->_host_dead[$ip] = time() + 30 + intval(rand(0, 10));
$this->_host_dead[$host] = $this->_host_dead[$ip];
unset($this->_cache_sock[$host]);
if (!$this->_active)
return false;
- if ($this->_single_sock !== null)
+ if ($this->_single_sock !== null) {
+ $this->_flush_read_buffer($this->_single_sock);
return $this->sock_to_host($this->_single_sock);
-
+ }
+
$hv = is_array($key) ? intval($key[0]) : $this->_hashfunc($key);
-
+
if ($this->_buckets === null)
{
foreach ($this->_servers as $v)
$this->_buckets = $bu;
$this->_bucketcount = count($bu);
}
-
+
$realkey = is_array($key) ? $key[1] : $key;
for ($tries = 0; $tries<20; $tries++)
{
$host = $this->_buckets[$hv % $this->_bucketcount];
$sock = $this->sock_to_host($host);
- if (is_resource($sock))
+ if (is_resource($sock)) {
+ $this->_flush_read_buffer($sock);
return $sock;
+ }
$hv += $this->_hashfunc($tries . $realkey);
}
-
+
return false;
}
// {{{ _hashfunc()
/**
- * Creates a hash interger based on the $key
+ * Creates a hash integer based on the $key
*
* @param string $key Key to hash
*
- * @return interger Hash value
+ * @return integer Hash value
* @access private
*/
function _hashfunc ($key)
{
- $hash = 0;
- for ($i=0; $i<strlen($key); $i++)
- {
- $hash = $hash*33 + ord($key[$i]);
- }
-
- return $hash;
+ # Hash function must on [0,0x7ffffff]
+ # We take the first 31 bits of the MD5 hash, which unlike the hash
+ # function used in a previous version of this client, works
+ return hexdec(substr(md5($key),0,8)) & 0x7fffffff;
}
// }}}
*
* @param string $cmd Command to perform
* @param string $key Key to perform it on
- * @param interger $amt Amount to adjust
+ * @param integer $amt Amount to adjust
*
- * @return interger New value of $key
+ * @return integer New value of $key
* @access private
*/
function _incrdecr ($cmd, $key, $amt=1)
{
if (!$this->_active)
return null;
-
+
$sock = $this->get_sock($key);
if (!is_resource($sock))
return null;
-
+
$key = is_array($key) ? $key[1] : $key;
- $this->stats[$cmd]++;
- if (!fwrite($sock, "$cmd $key $amt\r\n"))
+ @$this->stats[$cmd]++;
+ if (!$this->_safe_fwrite($sock, "$cmd $key $amt\r\n"))
return $this->_dead_sock($sock);
-
+
stream_set_timeout($sock, 1, 0);
$line = fgets($sock);
+ $match = array();
if (!preg_match('/^(\d+)/', $line, $match))
return null;
return $match[1];
list($rkey, $flags, $len) = array($match[1], $match[2], $match[3]);
$bneed = $len+2;
$offset = 0;
-
+
while ($bneed > 0)
{
$data = fread($sock, $bneed);
$bneed -= $n;
@$ret[$rkey] .= $data;
}
-
+
if ($offset != $len+2)
{
// Something is borked!
$this->_close_sock($sock);
return false;
}
-
- $ret[$rkey] = rtrim($ret[$rkey]);
if ($this->_have_zlib && $flags & MEMCACHE_COMPRESSED)
$ret[$rkey] = gzuncompress($ret[$rkey]);
+ $ret[$rkey] = rtrim($ret[$rkey]);
+
if ($flags & MEMCACHE_SERIALIZED)
$ret[$rkey] = unserialize($ret[$rkey]);
- } else
+ } else
{
$this->_debugprint("Error parsing memcached response\n");
return 0;
* @param string $cmd Command to perform
* @param string $key Key to act on
* @param mixed $val What we need to store
- * @param interger $exp When it should expire
+ * @param integer $exp When it should expire
*
* @return boolean
* @access private
{
if (!$this->_active)
return false;
-
+
$sock = $this->get_sock($key);
if (!is_resource($sock))
return false;
-
+
@$this->stats[$cmd]++;
-
+
$flags = 0;
-
+
if (!is_scalar($val))
{
$val = serialize($val);
if ($this->_debug)
$this->_debugprint(sprintf("client: serializing data as it is not scalar\n"));
}
-
+
$len = strlen($val);
-
- if ($this->_have_zlib && $this->_compress_enable &&
+
+ if ($this->_have_zlib && $this->_compress_enable &&
$this->_compress_threshold && $len >= $this->_compress_threshold)
{
$c_val = gzcompress($val, 9);
$c_len = strlen($c_val);
-
- if ($c_len < $len*(1 - COMPRESS_SAVINGS))
+
+ if ($c_len < $len*(1 - COMPRESSION_SAVINGS))
{
if ($this->_debug)
$this->_debugprint(sprintf("client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len));
$flags |= MEMCACHE_COMPRESSED;
}
}
- if (!fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n"))
+ if (!$this->_safe_fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n"))
return $this->_dead_sock($sock);
-
+
$line = trim(fgets($sock));
-
+
if ($this->_debug)
{
if ($flags & MEMCACHE_COMPRESSED)
{
if (isset($this->_cache_sock[$host]))
return $this->_cache_sock[$host];
-
+
+ $sock = null;
$now = time();
- list ($ip, $port) = explode (":", $host);
+ list ($ip, /* $port */) = explode (":", $host);
if (isset($this->_host_dead[$host]) && $this->_host_dead[$host] > $now ||
isset($this->_host_dead[$ip]) && $this->_host_dead[$ip] > $now)
return null;
-
+
if (!$this->_connect_sock($sock, $host))
return $this->_dead_sock($host);
-
+
// Do not buffer writes
stream_set_write_buffer($sock, 0);
-
+
$this->_cache_sock[$host] = $sock;
-
+
return $this->_cache_sock[$host];
}
print($str);
}
+ /**
+ * Write to a stream, timing out after the correct amount of time
+ *
+ * @return bool false on failure, true on success
+ */
+ /*
+ function _safe_fwrite($f, $buf, $len = false) {
+ stream_set_blocking($f, 0);
+
+ if ($len === false) {
+ wfDebug("Writing " . strlen( $buf ) . " bytes\n");
+ $bytesWritten = fwrite($f, $buf);
+ } else {
+ wfDebug("Writing $len bytes\n");
+ $bytesWritten = fwrite($f, $buf, $len);
+ }
+ $n = stream_select($r=NULL, $w = array($f), $e = NULL, 10, 0);
+ # $this->_timeout_seconds, $this->_timeout_microseconds);
+
+ wfDebug("stream_select returned $n\n");
+ stream_set_blocking($f, 1);
+ return $n == 1;
+ return $bytesWritten;
+ }*/
+
+ /**
+ * Original behaviour
+ */
+ function _safe_fwrite($f, $buf, $len = false) {
+ if ($len === false) {
+ $bytesWritten = fwrite($f, $buf);
+ } else {
+ $bytesWritten = fwrite($f, $buf, $len);
+ }
+ return $bytesWritten;
+ }
+
+ /**
+ * Flush the read buffer of a stream
+ */
+ function _flush_read_buffer($f) {
+ if (!is_resource($f)) {
+ return;
+ }
+ $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0);
+ while ($n == 1 && !feof($f)) {
+ fread($f, 1024);
+ $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0);
+ }
+ }
+
// }}}
// }}}
// }}}
}
+// vim: sts=3 sw=3 et
+
// }}}
-?>
+