* (bug 21551) Rewrote the Squid purge HTTP client to provide a more robust and genera...
authorTim Starling <tstarling@users.mediawiki.org>
Fri, 5 Feb 2010 03:36:04 +0000 (03:36 +0000)
committerTim Starling <tstarling@users.mediawiki.org>
Fri, 5 Feb 2010 03:36:04 +0000 (03:36 +0000)
* Reverted what was left of r59178
* Removed $wgDebugSquid, didn't do anything anyway

CREDITS
RELEASE-NOTES
includes/AutoLoader.php
includes/DefaultSettings.php
includes/SquidPurgeClient.php [new file with mode: 0644]
includes/SquidUpdate.php

diff --git a/CREDITS b/CREDITS
index 71ef151..e24bab9 100644 (file)
--- a/CREDITS
+++ b/CREDITS
@@ -112,7 +112,6 @@ following names for their contribution to the product.
 * RenĂ© Kijewski
 * Robert Treat
 * RockMFR
-* Roi Avinoam
 * ST47
 * Scott Colcord
 * Simon Walker
index e5cee93..292dbe4 100644 (file)
@@ -671,7 +671,9 @@ it from source control: http://www.mediawiki.org/wiki/Download_from_SVN
 * (bug 19391) Fix caching for Recent ChangesFeed.
 * (bug 21455) Fixed "Watch this page" checkbox appearing on some special pages
   even to non-logged in users
-* (bug 21551) Make Squid reponse limit configurable
+* (bug 21551) Rewrote the Squid purge HTTP client to provide a more robust and
+  general implementation of HTTP, allowing it to purge non-Squid caches such as
+  Varnish.
 * Fixed corruption of long UDP debug log messages by using socket_sendto()
   instead of fsockopen() with fwrite().
 * (bug 16884) Fixed feed links in sidebar not complying with URL parameters
index c3fe7b1..7cd9b58 100644 (file)
@@ -214,6 +214,8 @@ $wgAutoloadLocalClasses = array(
        'SpecialRedirectToSpecial' => 'includes/SpecialPage.php',
        'SqlBagOStuff' => 'includes/BagOStuff.php',
        'SquidUpdate' => 'includes/SquidUpdate.php',
+       'SquidPurgeClient' => 'includes/SquidPurgeClient.php',
+       'SquidPurgeClientPool' => 'includes/SquidPurgeClient.php',
        'Status' => 'includes/Status.php',
        'StubContLang' => 'includes/StubObject.php',
        'StubUser' => 'includes/StubObject.php',
index 3b419c0..a7e57d7 100644 (file)
@@ -1834,11 +1834,6 @@ $wgSquidServers = array();
  */
 $wgSquidServersNoPurge = array();
 
-/**
- * Default character limit for squid purge responses
- */
-$wgSquidResponseLimit = 250;
-
 /** Maximum number of titles to purge in any one client operation */
 $wgMaxSquidPurgeTitles = 400;
 
@@ -2011,8 +2006,6 @@ $wgUDPProfilerPort = '3811';
 $wgDebugProfiling = false;
 /** Output debug message on every wfProfileIn/wfProfileOut */
 $wgDebugFunctionEntry = 0;
-/** Lots of debugging output from SquidUpdate.php */
-$wgDebugSquid = false;
 
 /*
  * Destination for wfIncrStats() data...
diff --git a/includes/SquidPurgeClient.php b/includes/SquidPurgeClient.php
new file mode 100644 (file)
index 0000000..65da5c1
--- /dev/null
@@ -0,0 +1,380 @@
+<?php
+/**
+ * An HTTP 1.0 client built for the purposes of purging Squid and Varnish. 
+ * Uses asynchronous I/O, allowing purges to be done in a highly parallel 
+ * manner. 
+ *
+ * Could be replaced by curl_multi_exec() or some such.
+ */
+class SquidPurgeClient {
+       var $host, $port, $ip;
+
+       var $readState = 'idle';
+       var $writeBuffer = '';
+       var $requests = array();
+       var $currentRequestIndex;
+
+       const EINTR = 4;
+       const EAGAIN = 11;
+       const EINPROGRESS = 115;
+       const BUFFER_SIZE = 8192;
+
+       /**
+        * The socket resource, or null for unconnected, or false for disabled due to error
+        */
+       var $socket;
+       
+       public function __construct( $server, $options = array() ) {
+               $parts = explode( ':', $server, 2 );
+               $this->host = $parts[0];
+               $this->port = isset( $parts[1] ) ? $parts[1] : 80;
+       }
+
+       /**
+        * Open a socket if there isn't one open already, return it.
+        * Returns false on error.
+        */
+       protected function getSocket() {
+               if ( $this->socket !== null ) {
+                       return $this->socket;
+               }
+
+               $ip = $this->getIP();
+               if ( !$ip ) {
+                       $this->log( "DNS error" );
+                       $this->markDown();
+                       return false;
+               }
+               $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
+               socket_set_nonblock( $this->socket );
+               wfSuppressWarnings();
+               $ok = socket_connect( $this->socket, $ip, $this->port );
+               wfRestoreWarnings();
+               if ( !$ok ) {
+                       $error = socket_last_error( $this->socket );
+                       if ( $error !== self::EINPROGRESS ) {
+                               $this->log( "connection error: " . socket_strerror( $error ) );
+                               $this->markDown();
+                               return false;
+                       }
+               }
+
+               return $this->socket;
+       }
+
+       /**
+        * Get read socket array for select()
+        */
+       public function getReadSocketsForSelect() {
+               if ( $this->readState == 'idle' ) {
+                       return array();
+               }
+               $socket = $this->getSocket();
+               if ( $socket === false ) {
+                       return array();
+               }
+               return array( $socket );
+       }
+
+       /**
+        * Get write socket array for select()
+        */
+       public function getWriteSocketsForSelect() {
+               if ( !strlen( $this->writeBuffer ) ) {
+                       return array();
+               }
+               $socket = $this->getSocket();
+               if ( $socket === false ) {
+                       return array();
+               }
+               return array( $socket );
+       }
+
+       /** 
+        * Get the host's IP address.
+        * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
+        */
+       protected function getIP() {
+               if ( $this->ip === null ) {
+                       if ( IP::isIPv4( $this->host ) ) {
+                               $this->ip = $this->host;
+                       } elseif ( IP::isIPv6( $this->host ) ) {
+                               throw new MWException( '$wgSquidServers does not support IPv6' );
+                       } else {
+                               wfSuppressWarnings();
+                               $this->ip = gethostbyname( $this->host );
+                               if ( $this->ip === $this->host ) {
+                                       $this->ip = false;
+                               }
+                               wfRestoreWarnings();
+                       }
+               }
+               return $this->ip;
+       }
+
+       /**
+        * Close the socket and ignore any future purge requests.
+        * This is called if there is a protocol error.
+        */
+       protected function markDown() {
+               $this->close();
+               $this->socket = false;
+       }
+
+       /**
+        * Close the socket but allow it to be reopened for future purge requests
+        */
+       public function close() {
+               if ( $this->socket ) {
+                       wfSuppressWarnings();
+                       socket_set_block( $this->socket );
+                       socket_shutdown( $this->socket );
+                       socket_close( $this->socket );
+                       wfRestoreWarnings();
+               }
+               $this->socket = null;
+               $this->readBuffer = '';
+               // Write buffer is kept since it may contain a request for the next socket
+       }
+
+       /**
+        * Queue a purge operation
+        */
+       public function queuePurge( $url ) {
+               $url = str_replace( "\n", '', $url );
+               $this->requests[] = "PURGE $url HTTP/1.0\r\n" .
+                       "Connection: Keep-Alive\r\n" .
+                       "Proxy-Connection: Keep-Alive\r\n" .
+                       "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n";
+               if ( $this->currentRequestIndex === null ) {
+                       $this->nextRequest();
+               }
+       }
+
+       public function isIdle() {
+               return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
+       }
+
+       /**
+        * Perform pending writes. Call this when socket_select() indicates that writing will not block.
+        */
+       public function doWrites() {
+               if ( !strlen( $this->writeBuffer ) ) {
+                       return;
+               }
+               $socket = $this->getSocket();
+               if ( !$socket ) {
+                       return;
+               }
+
+               if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
+                       $buf = $this->writeBuffer;
+                       $flags = MSG_EOR;
+               } else {
+                       $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
+                       $flags = 0;
+               }
+               wfSuppressWarnings();
+               $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
+               wfRestoreWarnings();
+
+               if ( $bytesSent === false ) {
+                       $error = socket_last_error( $socket );
+                       if ( $error != self::EAGAIN && $error != self::EINTR ) {
+                               $this->log( 'write error: ' . socket_strerror( $error ) );
+                               $this->markDown();
+                       }
+                       return;
+               }
+
+               $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
+       }
+
+       /**
+        * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
+        */
+       public function doReads() {
+               $socket = $this->getSocket();
+               if ( !$socket ) {
+                       return;
+               }
+
+               $buf = '';
+               wfSuppressWarnings();
+               $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
+               wfRestoreWarnings();
+               if ( $bytesRead === false ) {
+                       $error = socket_last_error( $socket );
+                       if ( $error != self::EAGAIN && $error != self::EINTR ) {
+                               $this->log( 'read error: ' . socket_strerror( $error ) );
+                               $this->markDown();
+                               return;
+                       }
+               } elseif ( $bytesRead === 0 ) {
+                       // Assume EOF
+                       $this->close();
+                       return;
+               }
+
+               $this->readBuffer .= $buf;
+               while ( $this->socket && $this->processReadBuffer() === 'continue' );
+       }
+
+       protected function processReadBuffer() {
+               switch ( $this->readState ) {
+               case 'idle':
+                       return 'done';
+               case 'status':
+               case 'header':
+                       $lines = explode( "\r\n", $this->readBuffer, 2 );
+                       if ( count( $lines ) < 2 ) {
+                               return 'done';
+                       }
+                       if ( $this->readState == 'status' )  {
+                               $this->processStatusLine( $lines[0] );
+                       } else { // header
+                               $this->processHeaderLine( $lines[0] );
+                       }
+                       $this->readBuffer = $lines[1];
+                       return 'continue';
+               case 'body':
+                       if ( $this->bodyRemaining !== null ) {
+                               if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
+                                       $this->bodyRemaining -= strlen( $this->readBuffer );
+                                       $this->readBuffer = '';
+                                       return 'done';
+                               } else {
+                                       $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
+                                       $this->bodyRemaining = 0;
+                                       $this->nextRequest();
+                                       return 'continue';
+                               }
+                       } else {
+                               // No content length, read all data to EOF
+                               $this->readBuffer = '';
+                               return 'done';
+                       }
+               default:
+                       throw new MWException( __METHOD__.': unexpected state' );
+               }
+       }
+
+       protected function processStatusLine( $line ) {
+               if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
+                       $this->log( 'invalid status line' );
+                       $this->markDown();
+                       return;
+               }
+               list( $all, $major, $minor, $status, $reason ) = $m;
+               $status = intval( $status );
+               if ( $status !== 200 && $status !== 404 ) {
+                       $this->log( "unexpected status code: $status $reason" );
+                       $this->markDown();
+                       return;
+               }
+               $this->readState = 'header';
+       }
+
+       protected function processHeaderLine( $line ) {
+               if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
+                       $this->bodyRemaining = intval( $m[1] );
+               } elseif ( $line === '' ) {
+                       $this->readState = 'body';
+               }
+       }
+
+       protected function nextRequest() {
+               if ( $this->currentRequestIndex !== null ) {
+                       unset( $this->requests[$this->currentRequestIndex] );
+               }
+               if ( count( $this->requests ) ) {
+                       $this->readState = 'status';
+                       $this->currentRequestIndex = key( $this->requests );
+                       $this->writeBuffer = $this->requests[$this->currentRequestIndex];
+               } else {
+                       $this->readState = 'idle';
+                       $this->currentRequestIndex = null;
+                       $this->writeBuffer = '';
+               }
+               $this->bodyRemaining = null;
+       }
+
+       protected function log( $msg ) {
+               wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" );
+       }
+}
+
+class SquidPurgeClientPool {
+       var $clients = array();
+       var $timeout = 5;
+
+       function __construct( $options = array() ) {
+               if ( isset( $options['timeout'] ) ) {
+                       $this->timeout = $options['timeout'];
+               }
+       }
+
+       public function addClient( $client ) {
+               $this->clients[] = $client;
+       }
+
+       public function run() {
+               $done = false;
+               $startTime = microtime( true );
+               while ( !$done ) {
+                       $readSockets = $writeSockets = array();
+                       foreach ( $this->clients as $clientIndex => $client ) {
+                               $sockets = $client->getReadSocketsForSelect();
+                               foreach ( $sockets as $i => $socket ) {
+                                       $readSockets["$clientIndex/$i"] = $socket;
+                               }
+                               $sockets = $client->getWriteSocketsForSelect();
+                               foreach ( $sockets as $i => $socket ) {
+                                       $writeSockets["$clientIndex/$i"] = $socket;
+                               }
+                       }
+                       if ( !count( $readSockets ) && !count( $writeSockets ) ) {
+                               break;
+                       }
+                       $exceptSockets = null;
+                       $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
+                       wfSuppressWarnings();
+                       $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
+                       wfRestoreWarnings();
+                       if ( $numReady === false ) {
+                               wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' . 
+                                       socket_strerror( socket_last_error() ) . "\n" );
+                               break;
+                       }
+                       // Check for timeout, use 1% tolerance since we aimed at having socket_select()
+                       // exit at precisely the overall timeout
+                       if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
+                               wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" );
+                               break;
+                       } elseif ( !$numReady ) {
+                               continue;
+                       }
+
+                       foreach ( $readSockets as $key => $socket ) {
+                               list( $clientIndex, $i ) = explode( '/', $key );
+                               $client = $this->clients[$clientIndex];
+                               $client->doReads();
+                       }
+                       foreach ( $writeSockets as $key => $socket ) {
+                               list( $clientIndex, $i ) = explode( '/', $key );
+                               $client = $this->clients[$clientIndex];
+                               $client->doWrites();
+                       }
+
+                       $done = true;
+                       foreach ( $this->clients as $client ) {
+                               if ( !$client->isIdle() ) {
+                                       $done = false;
+                               }
+                       }
+               }
+               foreach ( $this->clients as $client ) {
+                       $client->close();
+               }
+       }
+}
index 1323ffa..6651771 100644 (file)
@@ -81,14 +81,14 @@ class SquidUpdate {
        XXX report broken Squids per mail or log */
 
        static function purge( $urlArr ) {
-               global $wgSquidServers, $wgHTCPMulticastAddress, $wgHTCPPort, $wgSquidResponseLimit;
+               global $wgSquidServers, $wgHTCPMulticastAddress, $wgHTCPPort;
 
                /*if ( (@$wgSquidServers[0]) == 'echo' ) {
                        echo implode("<br />\n", $urlArr) . "<br />\n";
                        return;
                }*/
 
-               if( empty( $urlArr ) ) {
+               if( !$urlArr ) {
                        return;
                }
 
@@ -98,105 +98,26 @@ class SquidUpdate {
 
                wfProfileIn( __METHOD__ );
 
-               $maxsocketspersquid = 8; //  socket cap per Squid
-               $urlspersocket = 400; // 400 seems to be a good tradeoff, opening a socket takes a while
-               $firsturl = SquidUpdate::expand( $urlArr[0] );
-               unset($urlArr[0]);
-               $urlArr = array_values($urlArr);
-               $sockspersq =  max(ceil(count($urlArr) / $urlspersocket ),1);
-               if ($sockspersq == 1) {
-                       /* the most common case */
-                       $urlspersocket = count($urlArr);
-               } else if ($sockspersq > $maxsocketspersquid ) {
-                       $urlspersocket = ceil(count($urlArr) / $maxsocketspersquid);
-                       $sockspersq = $maxsocketspersquid;
+               $maxSocketsPerSquid = 8; //  socket cap per Squid
+               $urlsPerSocket = 400; // 400 seems to be a good tradeoff, opening a socket takes a while
+               $socketsPerSquid = ceil( count( $urlArr ) / $urlsPerSocket );
+               if ( $socketsPerSquid > $maxSocketsPerSquid ) {
+                       $socketsPerSquid = $maxSocketsPerSquid;
                }
-               $totalsockets = count($wgSquidServers) * $sockspersq;
-               $sockets = Array();
 
-               /* this sets up the sockets and tests the first socket for each server. */
-               for ($ss=0;$ss < count($wgSquidServers);$ss++) {
-                       $failed = false;
-                       $so = 0;
-                       while ($so < $sockspersq && !$failed) {
-                               if ($so == 0) {
-                                       /* first socket for this server, do the tests */
-                                       @list($server, $port) = explode(':', $wgSquidServers[$ss]);
-                                       if(!isset($port)) $port = 80;
-                                       #$this->debug("Opening socket to $server:$port");
-                                       $error = $errstr = false;
-                                       $socket = @fsockopen($server, $port, $error, $errstr, 3);
-                                       #$this->debug("\n");
-                                       if (!$socket) {
-                                               $failed = true;
-                                               $totalsockets -= $sockspersq;
-                                       } else {
-                                               $msg = 'PURGE ' . $firsturl . " HTTP/1.0\r\n".
-                                               "Connection: Keep-Alive\r\n\r\n";
-                                               #$this->debug($msg);
-                                               @fputs($socket,$msg);
-                                               #$this->debug("...");
-                                               $res = @fread($socket,512);
-                                               #$this->debug("\n");
-                                               /* Squid only returns http headers with 200 or 404 status,
-                                               if there's more returned something's wrong */
-                                               if (strlen($res) > $wgSquidResponseLimit) {
-                                                       fclose($socket);
-                                                       $failed = true;
-                                                       $totalsockets -= $sockspersq;
-                                               } else {
-                                                       @stream_set_blocking($socket,false);
-                                                       $sockets[] = $socket;
-                                               }
-                                       }
-                               } else {
-                                       /* open the remaining sockets for this server */
-                                       list($server, $port) = explode(':', $wgSquidServers[$ss]);
-                                       if(!isset($port)) $port = 80;
-                                       $socket = @fsockopen($server, $port, $error, $errstr, 2);
-                                       @stream_set_blocking($socket,false);
-                                       $sockets[] = $socket;
+               $pool = new SquidPurgeClientPool;
+               $chunks = array_chunk( $urlArr, ceil( count( $urlArr ) / $socketsPerSquid ) );
+               foreach ( $wgSquidServers as $server ) {
+                       foreach ( $chunks as $chunk ) {
+                               $client = new SquidPurgeClient( $server );
+                               foreach ( $chunk as $url ) {
+                                       $client->queuePurge( $url );
                                }
-                               $so++;
+                               $pool->addClient( $client );
                        }
                }
+               $pool->run();
 
-               if ($urlspersocket > 0) {
-                       /* now do the heavy lifting. The fread() relies on Squid returning only the headers */
-                       for ($r=0;$r < $urlspersocket;$r++) {
-                               for ($s=0;$s < $totalsockets;$s++) {
-                                       if($r != 0) {
-                                               $res = '';
-                                               $esc = 0;
-                                               while (strlen($res) < 100 && $esc < 200  ) {
-                                                       $res .= @fread($sockets[$s],512);
-                                                       $esc++;
-                                                       usleep(20);
-                                               }
-                                       }
-                                       $urindex = $r + $urlspersocket * ($s - $sockspersq * floor($s / $sockspersq));
-                                       $url = SquidUpdate::expand( $urlArr[$urindex] );
-                                       $msg = 'PURGE ' . $url . " HTTP/1.0\r\n".
-                                       "Connection: Keep-Alive\r\n\r\n";
-                                       #$this->debug($msg);
-                                       @fputs($sockets[$s],$msg);
-                                       #$this->debug("\n");
-                               }
-                       }
-               }
-               #$this->debug("Reading response...");
-               foreach ($sockets as $socket) {
-                       $res = '';
-                       $esc = 0;
-                       while (strlen($res) < 100 && $esc < 200  ) {
-                               $res .= @fread($socket,1024);
-                               $esc++;
-                               usleep(20);
-                       }
-
-                       @fclose($socket);
-               }
-               #$this->debug("\n");
                wfProfileOut( __METHOD__ );
        }
 
@@ -259,13 +180,6 @@ class SquidUpdate {
                wfProfileOut( __METHOD__ );
        }
 
-       function debug( $text ) {
-               global $wgDebugSquid;
-               if ( $wgDebugSquid ) {
-                       wfDebug( $text );
-               }
-       }
-
        /**
         * Expand local URLs to fully-qualified URLs using the internal protocol
         * and host defined in $wgInternalServer. Input that's already fully-