* code formatting
[lhc/web/wiklou.git] / includes / memcached-client.php
index 2752180..bc7ce1c 100644 (file)
@@ -205,6 +205,22 @@ class memcached
     */
    var $_active;
 
+   /**
+    * Stream timeout in seconds. Applies for example to fread()
+    *
+    * @var     integer
+    * @access  private
+    */
+   var $_timeout_seconds;
+
+   /**
+    * Stream timeout in microseconds
+    *
+    * @var     integer
+    * @access  private
+    */
+   var $_timeout_microseconds;
+
    // }}}
    // }}}
    // {{{ methods
@@ -231,6 +247,9 @@ class memcached
       
       $this->_cache_sock = array();
       $this->_host_dead = array();
+
+      $this->_timeout_seconds = 1;
+      $this->_timeout_microseconds = 0;
    }
 
    // }}}
@@ -294,7 +313,7 @@ class memcached
       
       @$this->stats['delete']++;
       $cmd = "delete $key $time\r\n";
-      if(!fwrite($sock, $cmd, strlen($cmd)))
+      if(!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
       {
          $this->_dead_sock($sock);
          return false;
@@ -366,20 +385,28 @@ class memcached
     */
    function get ($key)
    {
-      if (!$this->_active)
+      $fname = 'memcached::get';
+      wfProfileIn( $fname );
+
+      if (!$this->_active) {
+        wfProfileOut( $fname );
          return false;
+      }
          
       $sock = $this->get_sock($key);
       
-      if (!is_resource($sock))
+      if (!is_resource($sock)) {
+        wfProfileOut( $fname );
          return false;
+      }
          
       @$this->stats['get']++;
       
       $cmd = "get $key\r\n";
-      if (!fwrite($sock, $cmd, strlen($cmd)))
+      if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
       {
          $this->_dead_sock($sock);
+        wfProfileOut( $fname );
          return false;
       }
       
@@ -390,6 +417,7 @@ class memcached
          foreach ($val as $k => $v)
             $this->_debugprint(@sprintf("MemCache: sock %s got %s => %s\r\n", serialize($sock), $k, $v));
 
+      wfProfileOut( $fname );
       return @$val[$key];
    }
 
@@ -434,7 +462,7 @@ class memcached
          }
          $cmd .= "\r\n";
          
-         if (fwrite($sock, $cmd, strlen($cmd)))
+         if ($this->_safe_fwrite($sock, $cmd, strlen($cmd)))
          {
             $gather[] = $sock;
          } else
@@ -516,7 +544,7 @@ class memcached
       if (!is_resource($sock))
          return array();
       
-      if (!fwrite($sock, $cmd, strlen($cmd)))
+      if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
          return array();
          
       while (true)
@@ -606,6 +634,20 @@ class memcached
          $this->_single_sock = $this->_servers[0];
    }
 
+   /**
+    * Sets the timeout for new connections
+    *
+    * @param   integer  $seconds Number of seconds
+    * @param   integer  $microseconds  Number of microseconds
+    * 
+    * @access  public
+    */
+   function set_timeout ($seconds, $microseconds)
+   {
+      $this->_timeout_seconds = $seconds;
+      $this->_timeout_microseconds = $microseconds;
+   }
+   
    // }}}
    // }}}
    // {{{ private methods
@@ -649,8 +691,15 @@ class memcached
          $sock = @fsockopen($ip, $port, $errno, $errstr, $timeout);
       }
       
-      if (!$sock)
+      if (!$sock) {
+         if ($this->_debug)
+            $this->_debugprint( "Error connecting to $host: $errstr\n" );
          return false;
+      }
+
+      // Initialise timeout
+      stream_set_timeout($sock, $this->_timeout_seconds, $this->_timeout_microseconds);
+
       return true;
    }
 
@@ -667,7 +716,7 @@ class memcached
    function _dead_sock ($sock)
    {
       $host = array_search($sock, $this->_cache_sock);
-      list ($ip, $port) = explode(":", $host);
+      @list ($ip, $port) = explode(":", $host);
       $this->_host_dead[$ip] = time() + 30 + intval(rand(0, 10));
       $this->_host_dead[$host] = $this->_host_dead[$ip];
       unset($this->_cache_sock[$host]);
@@ -689,8 +738,10 @@ class memcached
       if (!$this->_active)
          return false;
 
-      if ($this->_single_sock !== null)
+      if ($this->_single_sock !== null) {
+         $this->_flush_read_buffer($this->_single_sock);
          return $this->sock_to_host($this->_single_sock);
+      }
       
       $hv = is_array($key) ? intval($key[0]) : $this->_hashfunc($key);
       
@@ -716,8 +767,10 @@ class memcached
       {
          $host = $this->_buckets[$hv % $this->_bucketcount];
          $sock = $this->sock_to_host($host);
-         if (is_resource($sock))
+         if (is_resource($sock)) {
+            $this->_flush_read_buffer($sock);
             return $sock;
+         }
          $hv += $this->_hashfunc($tries . $realkey);
       }
       
@@ -737,7 +790,10 @@ class memcached
     */
    function _hashfunc ($key)
    {
-      return crc32($key);
+      # Hash function must on [0,0x7ffffff]
+      # We take the first 31 bits of the MD5 hash, which unlike the hash 
+      # function used in a previous version of this client, works
+      return hexdec(substr(md5($key),0,8)) & 0x7fffffff;
    }
 
    // }}}
@@ -763,8 +819,8 @@ class memcached
          return null;
          
       $key = is_array($key) ? $key[1] : $key;
-      $this->stats[$cmd]++;
-      if (!fwrite($sock, "$cmd $key $amt\r\n"))
+      @$this->stats[$cmd]++;
+      if (!$this->_safe_fwrite($sock, "$cmd $key $amt\r\n"))
          return $this->_dead_sock($sock);
          
       stream_set_timeout($sock, 1, 0);
@@ -821,11 +877,11 @@ class memcached
                return false;
             }
             
-            $ret[$rkey] = rtrim($ret[$rkey]);
-
             if ($this->_have_zlib && $flags & MEMCACHE_COMPRESSED)
                $ret[$rkey] = gzuncompress($ret[$rkey]);
 
+            $ret[$rkey] = rtrim($ret[$rkey]);
+
             if ($flags & MEMCACHE_SERIALIZED)
                $ret[$rkey] = unserialize($ret[$rkey]);
 
@@ -880,7 +936,7 @@ class memcached
          $c_val = gzcompress($val, 9);
          $c_len = strlen($c_val);
          
-         if ($c_len < $len*(1 - COMPRESS_SAVINGS))
+         if ($c_len < $len*(1 - COMPRESSION_SAVINGS))
          {
             if ($this->_debug)
                $this->_debugprint(sprintf("client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len));
@@ -889,7 +945,7 @@ class memcached
             $flags |= MEMCACHE_COMPRESSED;
          }
       }
-      if (!fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n"))
+      if (!$this->_safe_fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n"))
          return $this->_dead_sock($sock);
          
       $line = trim(fgets($sock));
@@ -942,10 +998,63 @@ class memcached
       print($str);
    }
 
+   /**
+    * Write to a stream, timing out after the correct amount of time
+    * 
+    * @return bool false on failure, true on success
+    */
+    /*
+   function _safe_fwrite($f, $buf, $len = false) {
+      stream_set_blocking($f, 0);
+
+      if ($len === false) {
+         wfDebug("Writing " . strlen( $buf ) . " bytes\n");
+         $bytesWritten = fwrite($f, $buf);
+      } else {
+         wfDebug("Writing $len bytes\n");
+         $bytesWritten = fwrite($f, $buf, $len);
+      }
+      $n = stream_select($r=NULL, $w = array($f), $e = NULL, 10, 0);
+      #   $this->_timeout_seconds, $this->_timeout_microseconds);
+
+      wfDebug("stream_select returned $n\n");
+      stream_set_blocking($f, 1);
+      return $n == 1;
+      return $bytesWritten;
+   }*/
+
+   /**
+    * Original behaviour
+    */
+   function _safe_fwrite($f, $buf, $len = false) {
+      if ($len === false) {
+         $bytesWritten = fwrite($f, $buf);
+      } else {
+         $bytesWritten = fwrite($f, $buf, $len);
+      }
+      return $bytesWritten;
+   }
+
+   /**
+    * Flush the read buffer of a stream
+    */
+   function _flush_read_buffer($f) {
+      if (!is_resource($f)) {
+         return;
+      }
+      $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0);
+      while ($n == 1 && !feof($f)) {
+         fread($f, 1024);
+         $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0);
+      }
+   }
+
    // }}}
    // }}}
    // }}}
 }
 
+// vim: sts=3 sw=3 et
+
 // }}}
 ?>