From 5800c37533d8183cfed62e8f5838006bfc43a8af Mon Sep 17 00:00:00 2001 From: Tim Starling Date: Wed, 8 Aug 2012 17:31:41 +1000 Subject: [PATCH] Added a Redis client for object cache storage With sharding, failover and consistent hashing. Only supports the extension (phpredis), since the pure PHP library Predis had about a billion files and it made my eyes hurt. Change-Id: I90fb4a41d15265b9c22f8e32ecd1d956f89f3ce8 --- RELEASE-NOTES-1.20 | 1 + includes/AutoLoader.php | 1 + includes/objectcache/BagOStuff.php | 19 ++ includes/objectcache/RedisBagOStuff.php | 392 ++++++++++++++++++++++++ 4 files changed, 413 insertions(+) create mode 100644 includes/objectcache/RedisBagOStuff.php diff --git a/RELEASE-NOTES-1.20 b/RELEASE-NOTES-1.20 index 92e1e765aa..d63d3471e6 100644 --- a/RELEASE-NOTES-1.20 +++ b/RELEASE-NOTES-1.20 @@ -113,6 +113,7 @@ upgrade PHP if you have not done so prior to upgrading MediaWiki. renamed to $wgSessionsInObjectCache, with the old name retained for backwards compatibility. When this feature is enabled, the expiry time can now be configured with $wgObjectCacheSessionExpiry. +* Added a Redis client for object caching. * Implemented mw.user.getGroups for getting and caching user groups. * (bug 37830) Added $wgRequirePasswordforEmailChange to control whether password confirmation is required for changing an email address or not. diff --git a/includes/AutoLoader.php b/includes/AutoLoader.php index e50430eb23..8397555ea3 100644 --- a/includes/AutoLoader.php +++ b/includes/AutoLoader.php @@ -732,6 +732,7 @@ $wgAutoloadLocalClasses = array( 'MWMemcached' => 'includes/objectcache/MemcachedClient.php', 'ObjectCache' => 'includes/objectcache/ObjectCache.php', 'ObjectCacheSessionHandler' => 'includes/objectcache/ObjectCacheSessionHandler.php', + 'RedisBagOStuff' => 'includes/objectcache/RedisBagOStuff.php', 'SqlBagOStuff' => 'includes/objectcache/SqlBagOStuff.php', 'WinCacheBagOStuff' => 'includes/objectcache/WinCacheBagOStuff.php', 'XCacheBagOStuff' => 'includes/objectcache/XCacheBagOStuff.php', diff --git a/includes/objectcache/BagOStuff.php b/includes/objectcache/BagOStuff.php index 0aebfa311b..fcc3aa9d64 100644 --- a/includes/objectcache/BagOStuff.php +++ b/includes/objectcache/BagOStuff.php @@ -216,4 +216,23 @@ abstract class BagOStuff { return $exptime; } } + + /** + * Convert an optionally absolute expiry time to a relative time. If an + * absolute time is specified which is in the past, use a short expiry time. + * + * @param $exptime integer + * @return integer + */ + protected function convertToRelative( $exptime ) { + if ( $exptime >= 86400 * 3650 /* 10 years */ ) { + $exptime -= time(); + if ( $exptime <= 0 ) { + $exptime = 1; + } + return $exptime; + } else { + return $exptime; + } + } } diff --git a/includes/objectcache/RedisBagOStuff.php b/includes/objectcache/RedisBagOStuff.php new file mode 100644 index 0000000000..67a3337dfc --- /dev/null +++ b/includes/objectcache/RedisBagOStuff.php @@ -0,0 +1,392 @@ +servers = $params['servers']; + $this->connectTimeout = isset( $params['connectTimeout'] ) + ? $params['connectTimeout'] : 1; + $this->persistent = !empty( $params['persistent'] ); + if ( isset( $params['password'] ) ) { + $this->password = $params['password']; + } + if ( isset( $params['automaticFailover'] ) ) { + $this->automaticFailover = $params['automaticFailover']; + } else { + $this->automaticFailover = true; + } + } + + public function get( $key ) { + wfProfileIn( __METHOD__ ); + list( $server, $conn ) = $this->getConnection( $key ); + if ( !$conn ) { + wfProfileOut( __METHOD__ ); + return false; + } + try { + $result = $conn->get( $key ); + } catch ( RedisException $e ) { + $result = false; + $this->handleException( $server, $e ); + } + $this->logRequest( 'get', $key, $server, $result ); + wfProfileOut( __METHOD__ ); + return $result; + } + + public function set( $key, $value, $expiry = 0 ) { + wfProfileIn( __METHOD__ ); + list( $server, $conn ) = $this->getConnection( $key ); + if ( !$conn ) { + wfProfileOut( __METHOD__ ); + return false; + } + $expiry = $this->convertToRelative( $expiry ); + try { + if ( !$expiry ) { + // No expiry, that is very different from zero expiry in Redis + $result = $conn->set( $key, $value ); + } else { + $result = $conn->setex( $key, $expiry, $value ); + } + } catch ( RedisException $e ) { + $result = false; + $this->handleException( $server, $e ); + } + + $this->logRequest( 'set', $key, $server, $result ); + wfProfileOut( __METHOD__ ); + return $result; + } + + public function delete( $key, $time = 0 ) { + wfProfileIn( __METHOD__ ); + list( $server, $conn ) = $this->getConnection( $key ); + if ( !$conn ) { + wfProfileOut( __METHOD__ ); + return false; + } + try { + $conn->delete( $key ); + // Return true even if the key didn't exist + $result = true; + } catch ( RedisException $e ) { + $result = false; + $this->handleException( $server, $e ); + } + $this->logRequest( 'delete', $key, $server, $result ); + wfProfileOut( __METHOD__ ); + return $result; + } + + public function getMulti( array $keys ) { + wfProfileIn( __METHOD__ ); + $batches = array(); + $conns = array(); + foreach ( $keys as $key ) { + list( $server, $conn ) = $this->getConnection( $key ); + if ( !$conn ) { + continue; + } + $conns[$server] = $conn; + $batches[$server][] = $key; + } + $result = array(); + foreach ( $batches as $server => $batchKeys ) { + $conn = $conns[$server]; + try { + $conn->multi( Redis::PIPELINE ); + foreach ( $batchKeys as $key ) { + $conn->get( $key ); + } + $batchResult = $conn->exec(); + if ( $batchResult === false ) { + $this->debug( "multi request to $server failed" ); + continue; + } + foreach ( $batchResult as $i => $value ) { + if ( $value !== false ) { + $result[$batchKeys[$i]] = $value; + } + } + } catch ( RedisException $e ) { + $this->handleException( $server, $e ); + } + } + + $this->debug( "getMulti for " . count( $keys ) . " keys " . + "returned " . count( $result ) . " results" ); + wfProfileOut( __METHOD__ ); + return $result; + } + + public function add( $key, $value, $expiry = 0 ) { + wfProfileIn( __METHOD__ ); + list( $server, $conn ) = $this->getConnection( $key ); + if ( !$conn ) { + wfProfileOut( __METHOD__ ); + return false; + } + $expiry = $this->convertToRelative( $expiry ); + try { + $result = $conn->setnx( $key, $value ); + if ( $result && $expiry ) { + $conn->expire( $key, $expiry ); + } + } catch ( RedisException $e ) { + $result = false; + $this->handleException( $server, $e ); + } + $this->logRequest( 'add', $key, $server, $result ); + wfProfileOut( __METHOD__ ); + return $result; + } + + /** + * Non-atomic implementation of replace(). Could perhaps be done atomically + * with WATCH or scripting, but this function is rarely used. + */ + public function replace( $key, $value, $expiry = 0 ) { + wfProfileIn( __METHOD__ ); + list( $server, $conn ) = $this->getConnection( $key ); + if ( !$conn ) { + wfProfileOut( __METHOD__ ); + return false; + } + if ( !$conn->exists( $key ) ) { + wfProfileOut( __METHOD__ ); + return false; + } + + $expiry = $this->convertToRelative( $expiry ); + try { + if ( !$expiry ) { + $result = $conn->set( $key, $value ); + } else { + $result = $conn->setex( $key, $expiry, $value ); + } + } catch ( RedisException $e ) { + $result = false; + $this->handleException( $server, $e ); + } + + $this->logRequest( 'replace', $key, $server, $result ); + wfProfileOut( __METHOD__ ); + return $result; + } + + /** + * Non-atomic implementation of incr(). + * + * Probably all callers actually want incr() to atomically initialise + * values to zero if they don't exist, as provided by the Redis INCR + * command. But we are constrained by the memcached-like interface to + * return null in that case. Once the key exists, further increments are + * atomic. + */ + public function incr( $key, $value = 1 ) { + wfProfileIn( __METHOD__ ); + list( $server, $conn ) = $this->getConnection( $key ); + if ( !$conn ) { + wfProfileOut( __METHOD__ ); + return false; + } + if ( !$conn->exists( $key ) ) { + wfProfileOut( __METHOD__ ); + return null; + } + try { + $result = $conn->incrBy( $key, $value ); + } catch ( RedisException $e ) { + $result = false; + $this->handleException( $server, $e ); + } + + $this->logRequest( 'incr', $key, $server, $result ); + wfProfileOut( __METHOD__ ); + return $result; + } + + /** + * Get a Redis object with a connection suitable for fetching the specified key + */ + protected function getConnection( $key ) { + if ( count( $this->servers ) === 1 ) { + $candidates = $this->servers; + } else { + // Use consistent hashing + $hashes = array(); + foreach ( $this->servers as $server ) { + $hashes[$server] = md5( $server . '/' . $key ); + } + asort( $hashes ); + if ( !$this->automaticFailover ) { + reset( $hashes ); + $candidates = array( key( $hashes ) ); + } else { + $candidates = array_keys( $hashes ); + } + } + + foreach ( $candidates as $server ) { + $conn = $this->getConnectionToServer( $server ); + if ( $conn ) { + return array( $server, $conn ); + } + } + return array( false, false ); + } + + /** + * Get a connection to the server with the specified name. Connections + * are cached, and failures are persistent to avoid multiple timeouts. + * + * @return Redis object, or false on failure + */ + protected function getConnectionToServer( $server ) { + if ( isset( $this->deadServers[$server] ) ) { + $now = time(); + if ( $now > $this->deadServers[$server] ) { + // Dead time expired + unset( $this->deadServers[$server] ); + } else { + // Server is dead + $this->debug( "server $server is marked down for another " . + ($this->deadServers[$server] - $now ) . + " seconds, can't get connection" ); + return false; + } + } + + if ( isset( $this->conns[$server] ) ) { + return $this->conns[$server]; + } + + if ( substr( $server, 0, 1 ) === '/' ) { + // UNIX domain socket + // These are required by the redis extension to start with a slash, but + // we still need to set the port to a special value to make it work. + $host = $server; + $port = 0; + } else { + // TCP connection + $hostPort = IP::splitHostAndPort( $server ); + if ( !$hostPort ) { + throw new MWException( __CLASS__.": invalid configured server \"$server\"" ); + } + list( $host, $port ) = $hostPort; + if ( $port === false ) { + $port = 6379; + } + } + $conn = new Redis; + try { + if ( $this->persistent ) { + $this->debug( "opening persistent connection to $host:$port" ); + $result = $conn->pconnect( $host, $port, $this->connectTimeout ); + } else { + $this->debug( "opening non-persistent connection to $host:$port" ); + $result = $conn->connect( $host, $port, $this->connectTimeout ); + } + if ( !$result ) { + $this->logError( "could not connect to server $server" ); + // Mark server down for 30s to avoid further timeouts + $this->deadServers[$server] = time() + 30; + return false; + } + if ( $this->password !== null ) { + if ( !$conn->auth( $this->password ) ) { + $this->logError( "authentication error connecting to $server" ); + } + } + } catch ( RedisException $e ) { + $this->deadServers[$server] = time() + 30; + wfDebugLog( 'redis', "Redis exception: " . $e->getMessage() . "\n" ); + return false; + } + + $conn->setOption( Redis::OPT_SERIALIZER, Redis::SERIALIZER_PHP ); + $this->conns[$server] = $conn; + return $conn; + } + + /** + * Log a fatal error + */ + protected function logError( $msg ) { + wfDebugLog( 'redis', "Redis error: $msg\n" ); + } + + /** + * The redis extension throws an exception in response to various read, write + * and protocol errors. Sometimes it also closes the connection, sometimes + * not. The safest response for us is to explicitly destroy the connection + * object and let it be reopened during the next request. + */ + protected function handleException( $server, $e ) { + wfDebugLog( 'redis', "Redis exception on server $server: " . $e->getMessage() . "\n" ); + unset( $this->conns[$server] ); + } + + /** + * Send information about a single request to the debug log + */ + public function logRequest( $method, $key, $server, $result ) { + $this->debug( "$method $key on $server: " . + ( $result === false ? "failure" : "success" ) ); + } +} + -- 2.20.1