(bug 5378) General logs link in Special:Contributions
[lhc/web/wiklou.git] / includes / memcached-client.php
index 363c610..697509e 100644 (file)
@@ -43,9 +43,9 @@
  * Usage example:
  *
  * require_once 'memcached.php';
- * 
+ *
  * $mc = new memcached(array(
- *              'servers' => array('127.0.0.1:10000', 
+ *              'servers' => array('127.0.0.1:10000',
  *                                 array('192.0.0.1:10010', 2),
  *                                 '127.0.0.1:10020'),
  *              'debug'   => false,
@@ -105,7 +105,7 @@ class memcached
     * @access  public
     */
    var $stats;
-   
+
    // }}}
    // {{{ private
 
@@ -116,7 +116,7 @@ class memcached
     * @access  private
     */
    var $_cache_sock;
-   
+
    /**
     * Current debug status; 0 - none to 9 - profiling
     *
@@ -124,7 +124,7 @@ class memcached
     * @access  private
     */
    var $_debug;
-   
+
    /**
     * Dead hosts, assoc array, 'host'=>'unixtime when ok to check again'
     *
@@ -132,7 +132,7 @@ class memcached
     * @access  private
     */
    var $_host_dead;
-   
+
    /**
     * Is compression available?
     *
@@ -140,7 +140,7 @@ class memcached
     * @access  private
     */
    var $_have_zlib;
-   
+
    /**
     * Do we want to use compression?
     *
@@ -148,7 +148,7 @@ class memcached
     * @access  private
     */
    var $_compress_enable;
-   
+
    /**
     * At how many bytes should we compress?
     *
@@ -156,7 +156,7 @@ class memcached
     * @access  private
     */
    var $_compress_threshold;
-   
+
    /**
     * Are we using persistant links?
     *
@@ -164,7 +164,7 @@ class memcached
     * @access  private
     */
    var $_persistant;
-   
+
    /**
     * If only using one server; contains ip:port to connect to
     *
@@ -172,7 +172,7 @@ class memcached
     * @access  private
     */
    var $_single_sock;
-   
+
    /**
     * Array containing ip:port or array(ip:port, weight)
     *
@@ -180,7 +180,7 @@ class memcached
     * @access  private
     */
    var $_servers;
-   
+
    /**
     * Our bit buckets
     *
@@ -188,7 +188,7 @@ class memcached
     * @access  private
     */
    var $_buckets;
-   
+
    /**
     * Total # of bit buckets we have
     *
@@ -196,7 +196,7 @@ class memcached
     * @access  private
     */
    var $_bucketcount;
-   
+
    /**
     * # of total servers we have
     *
@@ -205,6 +205,22 @@ class memcached
     */
    var $_active;
 
+   /**
+    * Stream timeout in seconds. Applies for example to fread()
+    *
+    * @var     integer
+    * @access  private
+    */
+   var $_timeout_seconds;
+
+   /**
+    * Stream timeout in microseconds
+    *
+    * @var     integer
+    * @access  private
+    */
+   var $_timeout_microseconds;
+
    // }}}
    // }}}
    // {{{ methods
@@ -221,23 +237,26 @@ class memcached
     */
    function memcached ($args)
    {
-      $this->set_servers($args['servers']);
-      $this->_debug = $args['debug'];
+      $this->set_servers(@$args['servers']);
+      $this->_debug = @$args['debug'];
       $this->stats = array();
-      $this->_compress_threshold = $args['compress_threshold'];
-      $this->_persistant = isset($args['persistant']) ? $args['persistant'] : false;
+      $this->_compress_threshold = @$args['compress_threshold'];
+      $this->_persistant = array_key_exists('persistant', $args) ? (@$args['persistant']) : false;
       $this->_compress_enable = true;
       $this->_have_zlib = function_exists("gzcompress");
-      
+
       $this->_cache_sock = array();
       $this->_host_dead = array();
+
+      $this->_timeout_seconds = 1;
+      $this->_timeout_microseconds = 0;
    }
 
    // }}}
    // {{{ add()
 
    /**
-    * Adds a key/value to the memcache server if one isn't already set with 
+    * Adds a key/value to the memcache server if one isn't already set with
     * that key
     *
     * @param   string   $key     Key to set with data
@@ -285,25 +304,25 @@ class memcached
    {
       if (!$this->_active)
          return false;
-         
+
       $sock = $this->get_sock($key);
       if (!is_resource($sock))
          return false;
-      
+
       $key = is_array($key) ? $key[1] : $key;
-      
-      $this->stats['delete']++;
+
+      @$this->stats['delete']++;
       $cmd = "delete $key $time\r\n";
-      if(!fwrite($sock, $cmd, strlen($cmd)))
+      if(!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
       {
          $this->_dead_sock($sock);
          return false;
       }
       $res = trim(fgets($sock));
-      
+
       if ($this->_debug)
-         printf("MemCache: delete %s (%s)\n", $key, $res);
-      
+         $this->_debugprint(sprintf("MemCache: delete %s (%s)\n", $key, $res));
+
       if ($res == "DELETED")
          return true;
       return false;
@@ -366,31 +385,40 @@ class memcached
     */
    function get ($key)
    {
-      if (!$this->_active)
+      $fname = 'memcached::get';
+      wfProfileIn( $fname );
+
+      if (!$this->_active) {
+        wfProfileOut( $fname );
          return false;
-         
+      }
+
       $sock = $this->get_sock($key);
-      
-      if (!is_resource($sock))
+
+      if (!is_resource($sock)) {
+        wfProfileOut( $fname );
          return false;
-         
-      $this->stats['get']++;
-      
+      }
+
+      @$this->stats['get']++;
+
       $cmd = "get $key\r\n";
-      if (!fwrite($sock, $cmd, strlen($cmd)))
+      if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
       {
          $this->_dead_sock($sock);
+        wfProfileOut( $fname );
          return false;
       }
-      
+
       $val = array();
       $this->_load_items($sock, $val);
-      
+
       if ($this->_debug)
          foreach ($val as $k => $v)
-            printf("MemCache: sock %s got %s => %s\r\n", $sock, $k, $v);
-      
-      return $val[$key];
+            $this->_debugprint(@sprintf("MemCache: sock %s got %s => %s\r\n", serialize($sock), $k, $v));
+
+      wfProfileOut( $fname );
+      return @$val[$key];
    }
 
    // }}}
@@ -408,9 +436,9 @@ class memcached
    {
       if (!$this->_active)
          return false;
-         
+
       $this->stats['get_multi']++;
-      
+
       foreach ($keys as $key)
       {
          $sock = $this->get_sock($key);
@@ -423,7 +451,7 @@ class memcached
          }
          $sock_keys[$sock][] = $key;
       }
-      
+
       // Send out the requests
       foreach ($socks as $sock)
       {
@@ -433,8 +461,8 @@ class memcached
             $cmd .= " ". $key;
          }
          $cmd .= "\r\n";
-         
-         if (fwrite($sock, $cmd, strlen($cmd)))
+
+         if ($this->_safe_fwrite($sock, $cmd, strlen($cmd)))
          {
             $gather[] = $sock;
          } else
@@ -442,18 +470,18 @@ class memcached
             $this->_dead_sock($sock);
          }
       }
-      
+
       // Parse responses
       $val = array();
       foreach ($gather as $sock)
       {
          $this->_load_items($sock, $val);
       }
-      
+
       if ($this->_debug)
          foreach ($val as $k => $v)
-            printf("MemCache: got %s => %s\r\n", $k, $v);
-            
+            $this->_debugprint(sprintf("MemCache: got %s => %s\r\n", $k, $v));
+
       return $val;
    }
 
@@ -496,7 +524,7 @@ class memcached
    // {{{ run_command()
 
    /**
-    * Passes through $cmd to the memcache server connected by $sock; returns 
+    * Passes through $cmd to the memcache server connected by $sock; returns
     * output as an array (null array if no output)
     *
     * NOTE: due to a possible bug in how PHP reads while using fgets(), each
@@ -515,10 +543,10 @@ class memcached
    {
       if (!is_resource($sock))
          return array();
-      
-      if (!fwrite($sock, $cmd, strlen($cmd)))
+
+      if (!$this->_safe_fwrite($sock, $cmd, strlen($cmd)))
          return array();
-         
+
       while (true)
       {
          $res = fgets($sock);
@@ -600,12 +628,26 @@ class memcached
       $this->_active = count($list);
       $this->_buckets = null;
       $this->_bucketcount = 0;
-      
+
       $this->_single_sock = null;
       if ($this->_active == 1)
          $this->_single_sock = $this->_servers[0];
    }
 
+   /**
+    * Sets the timeout for new connections
+    *
+    * @param   integer  $seconds Number of seconds
+    * @param   integer  $microseconds  Number of microseconds
+    *
+    * @access  public
+    */
+   function set_timeout ($seconds, $microseconds)
+   {
+      $this->_timeout_seconds = $seconds;
+      $this->_timeout_microseconds = $microseconds;
+   }
+
    // }}}
    // }}}
    // {{{ private methods
@@ -648,9 +690,16 @@ class memcached
       {
          $sock = @fsockopen($ip, $port, $errno, $errstr, $timeout);
       }
-      
-      if (!$sock)
+
+      if (!$sock) {
+         if ($this->_debug)
+            $this->_debugprint( "Error connecting to $host: $errstr\n" );
          return false;
+      }
+
+      // Initialise timeout
+      stream_set_timeout($sock, $this->_timeout_seconds, $this->_timeout_microseconds);
+
       return true;
    }
 
@@ -667,7 +716,7 @@ class memcached
    function _dead_sock ($sock)
    {
       $host = array_search($sock, $this->_cache_sock);
-      list ($ip, $port) = explode(":", $host);
+      @list ($ip, $port) = explode(":", $host);
       $this->_host_dead[$ip] = time() + 30 + intval(rand(0, 10));
       $this->_host_dead[$host] = $this->_host_dead[$ip];
       unset($this->_cache_sock[$host]);
@@ -689,11 +738,13 @@ class memcached
       if (!$this->_active)
          return false;
 
-      if ($this->_single_sock !== null)
+      if ($this->_single_sock !== null) {
+         $this->_flush_read_buffer($this->_single_sock);
          return $this->sock_to_host($this->_single_sock);
-      
+      }
+
       $hv = is_array($key) ? intval($key[0]) : $this->_hashfunc($key);
-      
+
       if ($this->_buckets === null)
       {
          foreach ($this->_servers as $v)
@@ -710,17 +761,19 @@ class memcached
          $this->_buckets = $bu;
          $this->_bucketcount = count($bu);
       }
-      
+
       $realkey = is_array($key) ? $key[1] : $key;
       for ($tries = 0; $tries<20; $tries++)
       {
          $host = $this->_buckets[$hv % $this->_bucketcount];
          $sock = $this->sock_to_host($host);
-         if (is_resource($sock))
+         if (is_resource($sock)) {
+            $this->_flush_read_buffer($sock);
             return $sock;
+         }
          $hv += $this->_hashfunc($tries . $realkey);
       }
-      
+
       return false;
    }
 
@@ -737,13 +790,10 @@ class memcached
     */
    function _hashfunc ($key)
    {
-      $hash = 0;
-      for ($i=0; $i<strlen($key); $i++)
-      {
-         $hash = $hash*33 + ord($key[$i]);
-      }
-      
-      return $hash;
+      # Hash function must on [0,0x7ffffff]
+      # We take the first 31 bits of the MD5 hash, which unlike the hash
+      # function used in a previous version of this client, works
+      return hexdec(substr(md5($key),0,8)) & 0x7fffffff;
    }
 
    // }}}
@@ -763,16 +813,16 @@ class memcached
    {
       if (!$this->_active)
          return null;
-         
+
       $sock = $this->get_sock($key);
       if (!is_resource($sock))
          return null;
-         
+
       $key = is_array($key) ? $key[1] : $key;
-      $this->stats[$cmd]++;
-      if (!fwrite($sock, "$cmd $key $amt\r\n"))
+      @$this->stats[$cmd]++;
+      if (!$this->_safe_fwrite($sock, "$cmd $key $amt\r\n"))
          return $this->_dead_sock($sock);
-         
+
       stream_set_timeout($sock, 1, 0);
       $line = fgets($sock);
       if (!preg_match('/^(\d+)/', $line, $match))
@@ -804,7 +854,7 @@ class memcached
             list($rkey, $flags, $len) = array($match[1], $match[2], $match[3]);
             $bneed = $len+2;
             $offset = 0;
-            
+
             while ($bneed > 0)
             {
                $data = fread($sock, $bneed);
@@ -813,31 +863,31 @@ class memcached
                   break;
                $offset += $n;
                $bneed -= $n;
-               $ret[$rkey] .= $data;
+               @$ret[$rkey] .= $data;
             }
-            
+
             if ($offset != $len+2)
             {
                // Something is borked!
                if ($this->_debug)
-                  printf("Something is borked!  key %s expecting %d got %d length\n", $rkey, $len+2, $offset);
+                  $this->_debugprint(sprintf("Something is borked!  key %s expecting %d got %d length\n", $rkey, $len+2, $offset));
 
                unset($ret[$rkey]);
                $this->_close_sock($sock);
                return false;
             }
-            
-            $ret[$rkey] = rtrim($ret[$rkey]);
 
             if ($this->_have_zlib && $flags & MEMCACHE_COMPRESSED)
                $ret[$rkey] = gzuncompress($ret[$rkey]);
 
+            $ret[$rkey] = rtrim($ret[$rkey]);
+
             if ($flags & MEMCACHE_SERIALIZED)
                $ret[$rkey] = unserialize($ret[$rkey]);
 
-         } else 
+         } else
          {
-            print("Error parsing memcached response\n");
+            $this->_debugprint("Error parsing memcached response\n");
             return 0;
          }
       }
@@ -861,50 +911,50 @@ class memcached
    {
       if (!$this->_active)
          return false;
-         
+
       $sock = $this->get_sock($key);
       if (!is_resource($sock))
          return false;
-         
-      $this->stats[$cmd]++;
-      
+
+      @$this->stats[$cmd]++;
+
       $flags = 0;
-      
+
       if (!is_scalar($val))
       {
          $val = serialize($val);
          $flags |= MEMCACHE_SERIALIZED;
          if ($this->_debug)
-            printf("client: serializing data as it is not scalar\n");
+            $this->_debugprint(sprintf("client: serializing data as it is not scalar\n"));
       }
-      
+
       $len = strlen($val);
-      
-      if ($this->_have_zlib && $this->_compress_enable && 
+
+      if ($this->_have_zlib && $this->_compress_enable &&
           $this->_compress_threshold && $len >= $this->_compress_threshold)
       {
          $c_val = gzcompress($val, 9);
          $c_len = strlen($c_val);
-         
-         if ($c_len < $len*(1 - COMPRESS_SAVINGS))
+
+         if ($c_len < $len*(1 - COMPRESSION_SAVINGS))
          {
             if ($this->_debug)
-               printf("client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len);
+               $this->_debugprint(sprintf("client: compressing data; was %d bytes is now %d bytes\n", $len, $c_len));
             $val = $c_val;
             $len = $c_len;
             $flags |= MEMCACHE_COMPRESSED;
          }
       }
-      if (!fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n"))
+      if (!$this->_safe_fwrite($sock, "$cmd $key $flags $exp $len\r\n$val\r\n"))
          return $this->_dead_sock($sock);
-         
+
       $line = trim(fgets($sock));
-      
+
       if ($this->_debug)
       {
          if ($flags & MEMCACHE_COMPRESSED)
             $val = 'compressed data';
-         printf("MemCache: %s %s => %s (%s)\n", $cmd, $key, $val, $line);
+         $this->_debugprint(sprintf("MemCache: %s %s => %s (%s)\n", $cmd, $key, $val, $line));
       }
       if ($line == "STORED")
          return true;
@@ -926,28 +976,85 @@ class memcached
    {
       if (isset($this->_cache_sock[$host]))
          return $this->_cache_sock[$host];
-      
+
       $now = time();
       list ($ip, $port) = explode (":", $host);
       if (isset($this->_host_dead[$host]) && $this->_host_dead[$host] > $now ||
           isset($this->_host_dead[$ip]) && $this->_host_dead[$ip] > $now)
          return null;
-         
+
       if (!$this->_connect_sock($sock, $host))
          return $this->_dead_sock($host);
-         
+
       // Do not buffer writes
       stream_set_write_buffer($sock, 0);
-      
+
       $this->_cache_sock[$host] = $sock;
-      
+
       return $this->_cache_sock[$host];
    }
 
+   function _debugprint($str){
+      print($str);
+   }
+
+   /**
+    * Write to a stream, timing out after the correct amount of time
+    *
+    * @return bool false on failure, true on success
+    */
+    /*
+   function _safe_fwrite($f, $buf, $len = false) {
+      stream_set_blocking($f, 0);
+
+      if ($len === false) {
+         wfDebug("Writing " . strlen( $buf ) . " bytes\n");
+         $bytesWritten = fwrite($f, $buf);
+      } else {
+         wfDebug("Writing $len bytes\n");
+         $bytesWritten = fwrite($f, $buf, $len);
+      }
+      $n = stream_select($r=NULL, $w = array($f), $e = NULL, 10, 0);
+      #   $this->_timeout_seconds, $this->_timeout_microseconds);
+
+      wfDebug("stream_select returned $n\n");
+      stream_set_blocking($f, 1);
+      return $n == 1;
+      return $bytesWritten;
+   }*/
+
+   /**
+    * Original behaviour
+    */
+   function _safe_fwrite($f, $buf, $len = false) {
+      if ($len === false) {
+         $bytesWritten = fwrite($f, $buf);
+      } else {
+         $bytesWritten = fwrite($f, $buf, $len);
+      }
+      return $bytesWritten;
+   }
+
+   /**
+    * Flush the read buffer of a stream
+    */
+   function _flush_read_buffer($f) {
+      if (!is_resource($f)) {
+         return;
+      }
+      $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0);
+      while ($n == 1 && !feof($f)) {
+         fread($f, 1024);
+         $n = stream_select($r=array($f), $w = NULL, $e = NULL, 0, 0);
+      }
+   }
+
    // }}}
    // }}}
    // }}}
 }
 
+// vim: sts=3 sw=3 et
+
 // }}}
 ?>