'http', 'encoding' => 'JSON', 'cacheTTL' => 10, 'skewTTL' => 1, 'timeout' => 2 ]; $this->host = $params['host']; $this->protocol = $params['protocol']; $this->directory = trim( $params['directory'], '/' ); $this->encoding = $params['encoding']; $this->skewCacheTTL = $params['skewTTL']; $this->baseCacheTTL = max( $params['cacheTTL'] - $this->skewCacheTTL, 0 ); $this->timeout = $params['timeout']; if ( !isset( $params['cache'] ) ) { $this->srvCache = new HashBagOStuff(); } elseif ( $params['cache'] instanceof BagOStuff ) { $this->srvCache = $params['cache']; } else { $this->srvCache = ObjectFactory::getObjectFromSpec( $params['cache'] ); } $this->logger = new Psr\Log\NullLogger(); $this->http = new MultiHttpClient( [ 'connTimeout' => $this->timeout, 'reqTimeout' => $this->timeout, 'logger' => $this->logger ] ); } public function setLogger( LoggerInterface $logger ) { $this->logger = $logger; $this->http->setLogger( $logger ); } public function has( $name ) { $this->load(); return array_key_exists( $name, $this->procCache['config'] ); } public function get( $name ) { $this->load(); if ( !array_key_exists( $name, $this->procCache['config'] ) ) { throw new ConfigException( "No entry found for '$name'." ); } return $this->procCache['config'][$name]; } /** * @throws ConfigException */ private function load() { if ( $this->procCache !== null ) { return; // already loaded } $now = microtime( true ); $key = $this->srvCache->makeGlobalKey( __CLASS__, $this->host, $this->directory ); // Get the cached value or block until it is regenerated (by this or another thread)... $data = null; // latest config info $error = null; // last error message $loop = new WaitConditionLoop( function () use ( $key, $now, &$data, &$error ) { // Check if the values are in cache yet... $data = $this->srvCache->get( $key ); if ( is_array( $data ) && $data['expires'] > $now ) { $this->logger->debug( "Found up-to-date etcd configuration cache." ); return WaitConditionLoop::CONDITION_REACHED; } // Cache is either empty or stale; // refresh the cache from etcd, using a mutex to reduce stampedes... if ( $this->srvCache->lock( $key, 0, $this->baseCacheTTL ) ) { try { list( $config, $error, $retry ) = $this->fetchAllFromEtcd(); if ( is_array( $config ) ) { // Avoid having all servers expire cache keys at the same time $expiry = microtime( true ) + $this->baseCacheTTL; $expiry += mt_rand( 0, 1e6 ) / 1e6 * $this->skewCacheTTL; $data = [ 'config' => $config, 'expires' => $expiry ]; $this->srvCache->set( $key, $data, BagOStuff::TTL_INDEFINITE ); $this->logger->info( "Refreshed stale etcd configuration cache." ); return WaitConditionLoop::CONDITION_REACHED; } else { $this->logger->error( "Failed to fetch configuration: $error" ); if ( !$retry ) { // Fail fast since the error is likely to keep happening return WaitConditionLoop::CONDITION_FAILED; } } } finally { $this->srvCache->unlock( $key ); // release mutex } } if ( is_array( $data ) ) { $this->logger->info( "Using stale etcd configuration cache." ); return WaitConditionLoop::CONDITION_REACHED; } return WaitConditionLoop::CONDITION_CONTINUE; }, $this->timeout ); if ( $loop->invoke() !== WaitConditionLoop::CONDITION_REACHED ) { // No cached value exists and etcd query failed; throw an error throw new ConfigException( "Failed to load configuration from etcd: $error" ); } $this->procCache = $data; } /** * @return array (config array or null, error string, allow retries) */ public function fetchAllFromEtcd() { $dsd = new DnsSrvDiscoverer( $this->host ); $servers = $dsd->getServers(); if ( !$servers ) { return $this->fetchAllFromEtcdServer( $this->host ); } do { // Pick a random etcd server from dns $server = $dsd->pickServer( $servers ); $host = IP::combineHostAndPort( $server['target'], $server['port'] ); // Try to load the config from this particular server list( $config, $error, $retry ) = $this->fetchAllFromEtcdServer( $host ); if ( is_array( $config ) || !$retry ) { break; } // Avoid the server next time if that failed $servers = $dsd->removeServer( $server, $servers ); } while ( $servers ); return [ $config, $error, $retry ]; } /** * @param string $address Host and port * @return array (config array or null, error string, whether to allow retries) */ protected function fetchAllFromEtcdServer( $address ) { // Retrieve all the values under the MediaWiki config directory list( $rcode, $rdesc, /* $rhdrs */, $rbody, $rerr ) = $this->http->run( [ 'method' => 'GET', 'url' => "{$this->protocol}://{$address}/v2/keys/{$this->directory}/", 'headers' => [ 'content-type' => 'application/json' ] ] ); static $terminalCodes = [ 404 => true ]; if ( $rcode < 200 || $rcode > 399 ) { return [ null, strlen( $rerr ) ? $rerr : "HTTP $rcode ($rdesc)", empty( $terminalCodes[$rcode] ) ]; } $info = json_decode( $rbody, true ); if ( $info === null || !isset( $info['node']['nodes'] ) ) { return [ null, "Unexpected JSON response; missing 'nodes' list.", false ]; } $config = []; foreach ( $info['node']['nodes'] as $node ) { if ( !empty( $node['dir'] ) ) { continue; // skip directories } $name = basename( $node['key'] ); $value = $this->unserialize( $node['value'] ); if ( !is_array( $value ) || !array_key_exists( 'val', $value ) ) { return [ null, "Failed to parse value for '$name'.", false ]; } $config[$name] = $value['val']; } return [ $config, null, false ]; } /** * @param string $string * @return mixed */ private function unserialize( $string ) { if ( $this->encoding === 'YAML' ) { return yaml_parse( $string ); } else { // JSON return json_decode( $string, true ); } } }