664f59a04789a0b94ca8be2ef75f5baed572a46e
[lhc/web/wiklou.git] / includes / SquidPurgeClient.php
1 <?php
2 /**
3 * An HTTP 1.0 client built for the purposes of purging Squid and Varnish.
4 * Uses asynchronous I/O, allowing purges to be done in a highly parallel
5 * manner.
6 *
7 * Could be replaced by curl_multi_exec() or some such.
8 */
9 class SquidPurgeClient {
10 var $host, $port, $ip;
11
12 var $readState = 'idle';
13 var $writeBuffer = '';
14 var $requests = array();
15 var $currentRequestIndex;
16
17 const EINTR = 4;
18 const EAGAIN = 11;
19 const EINPROGRESS = 115;
20 const BUFFER_SIZE = 8192;
21
22 /**
23 * The socket resource, or null for unconnected, or false for disabled due to error
24 */
25 var $socket;
26
27 public function __construct( $server, $options = array() ) {
28 $parts = explode( ':', $server, 2 );
29 $this->host = $parts[0];
30 $this->port = isset( $parts[1] ) ? $parts[1] : 80;
31 }
32
33 /**
34 * Open a socket if there isn't one open already, return it.
35 * Returns false on error.
36 */
37 protected function getSocket() {
38 if ( $this->socket !== null ) {
39 return $this->socket;
40 }
41
42 $ip = $this->getIP();
43 if ( !$ip ) {
44 $this->log( "DNS error" );
45 $this->markDown();
46 return false;
47 }
48 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
49 socket_set_nonblock( $this->socket );
50 wfSuppressWarnings();
51 $ok = socket_connect( $this->socket, $ip, $this->port );
52 wfRestoreWarnings();
53 if ( !$ok ) {
54 $error = socket_last_error( $this->socket );
55 if ( $error !== self::EINPROGRESS ) {
56 $this->log( "connection error: " . socket_strerror( $error ) );
57 $this->markDown();
58 return false;
59 }
60 }
61
62 return $this->socket;
63 }
64
65 /**
66 * Get read socket array for select()
67 */
68 public function getReadSocketsForSelect() {
69 if ( $this->readState == 'idle' ) {
70 return array();
71 }
72 $socket = $this->getSocket();
73 if ( $socket === false ) {
74 return array();
75 }
76 return array( $socket );
77 }
78
79 /**
80 * Get write socket array for select()
81 */
82 public function getWriteSocketsForSelect() {
83 if ( !strlen( $this->writeBuffer ) ) {
84 return array();
85 }
86 $socket = $this->getSocket();
87 if ( $socket === false ) {
88 return array();
89 }
90 return array( $socket );
91 }
92
93 /**
94 * Get the host's IP address.
95 * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
96 */
97 protected function getIP() {
98 if ( $this->ip === null ) {
99 if ( IP::isIPv4( $this->host ) ) {
100 $this->ip = $this->host;
101 } elseif ( IP::isIPv6( $this->host ) ) {
102 throw new MWException( '$wgSquidServers does not support IPv6' );
103 } else {
104 wfSuppressWarnings();
105 $this->ip = gethostbyname( $this->host );
106 if ( $this->ip === $this->host ) {
107 $this->ip = false;
108 }
109 wfRestoreWarnings();
110 }
111 }
112 return $this->ip;
113 }
114
115 /**
116 * Close the socket and ignore any future purge requests.
117 * This is called if there is a protocol error.
118 */
119 protected function markDown() {
120 $this->close();
121 $this->socket = false;
122 }
123
124 /**
125 * Close the socket but allow it to be reopened for future purge requests
126 */
127 public function close() {
128 if ( $this->socket ) {
129 wfSuppressWarnings();
130 socket_set_block( $this->socket );
131 socket_shutdown( $this->socket );
132 socket_close( $this->socket );
133 wfRestoreWarnings();
134 }
135 $this->socket = null;
136 $this->readBuffer = '';
137 // Write buffer is kept since it may contain a request for the next socket
138 }
139
140 /**
141 * Queue a purge operation
142 *
143 * @param $url string
144 */
145 public function queuePurge( $url ) {
146 $url = str_replace( "\n", '', $url );
147 $this->requests[] = "PURGE $url HTTP/1.0\r\n" .
148 "Connection: Keep-Alive\r\n" .
149 "Proxy-Connection: Keep-Alive\r\n" .
150 "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n";
151 if ( $this->currentRequestIndex === null ) {
152 $this->nextRequest();
153 }
154 }
155
156 /**
157 * @return bool
158 */
159 public function isIdle() {
160 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
161 }
162
163 /**
164 * Perform pending writes. Call this when socket_select() indicates that writing will not block.
165 */
166 public function doWrites() {
167 if ( !strlen( $this->writeBuffer ) ) {
168 return;
169 }
170 $socket = $this->getSocket();
171 if ( !$socket ) {
172 return;
173 }
174
175 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
176 $buf = $this->writeBuffer;
177 $flags = MSG_EOR;
178 } else {
179 $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
180 $flags = 0;
181 }
182 wfSuppressWarnings();
183 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
184 wfRestoreWarnings();
185
186 if ( $bytesSent === false ) {
187 $error = socket_last_error( $socket );
188 if ( $error != self::EAGAIN && $error != self::EINTR ) {
189 $this->log( 'write error: ' . socket_strerror( $error ) );
190 $this->markDown();
191 }
192 return;
193 }
194
195 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
196 }
197
198 /**
199 * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
200 */
201 public function doReads() {
202 $socket = $this->getSocket();
203 if ( !$socket ) {
204 return;
205 }
206
207 $buf = '';
208 wfSuppressWarnings();
209 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
210 wfRestoreWarnings();
211 if ( $bytesRead === false ) {
212 $error = socket_last_error( $socket );
213 if ( $error != self::EAGAIN && $error != self::EINTR ) {
214 $this->log( 'read error: ' . socket_strerror( $error ) );
215 $this->markDown();
216 return;
217 }
218 } elseif ( $bytesRead === 0 ) {
219 // Assume EOF
220 $this->close();
221 return;
222 }
223
224 $this->readBuffer .= $buf;
225 while ( $this->socket && $this->processReadBuffer() === 'continue' );
226 }
227
228 protected function processReadBuffer() {
229 switch ( $this->readState ) {
230 case 'idle':
231 return 'done';
232 case 'status':
233 case 'header':
234 $lines = explode( "\r\n", $this->readBuffer, 2 );
235 if ( count( $lines ) < 2 ) {
236 return 'done';
237 }
238 if ( $this->readState == 'status' ) {
239 $this->processStatusLine( $lines[0] );
240 } else { // header
241 $this->processHeaderLine( $lines[0] );
242 }
243 $this->readBuffer = $lines[1];
244 return 'continue';
245 case 'body':
246 if ( $this->bodyRemaining !== null ) {
247 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
248 $this->bodyRemaining -= strlen( $this->readBuffer );
249 $this->readBuffer = '';
250 return 'done';
251 } else {
252 $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
253 $this->bodyRemaining = 0;
254 $this->nextRequest();
255 return 'continue';
256 }
257 } else {
258 // No content length, read all data to EOF
259 $this->readBuffer = '';
260 return 'done';
261 }
262 default:
263 throw new MWException( __METHOD__.': unexpected state' );
264 }
265 }
266
267 protected function processStatusLine( $line ) {
268 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
269 $this->log( 'invalid status line' );
270 $this->markDown();
271 return;
272 }
273 list( , , , $status, $reason ) = $m;
274 $status = intval( $status );
275 if ( $status !== 200 && $status !== 404 ) {
276 $this->log( "unexpected status code: $status $reason" );
277 $this->markDown();
278 return;
279 }
280 $this->readState = 'header';
281 }
282
283 protected function processHeaderLine( $line ) {
284 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
285 $this->bodyRemaining = intval( $m[1] );
286 } elseif ( $line === '' ) {
287 $this->readState = 'body';
288 }
289 }
290
291 protected function nextRequest() {
292 if ( $this->currentRequestIndex !== null ) {
293 unset( $this->requests[$this->currentRequestIndex] );
294 }
295 if ( count( $this->requests ) ) {
296 $this->readState = 'status';
297 $this->currentRequestIndex = key( $this->requests );
298 $this->writeBuffer = $this->requests[$this->currentRequestIndex];
299 } else {
300 $this->readState = 'idle';
301 $this->currentRequestIndex = null;
302 $this->writeBuffer = '';
303 }
304 $this->bodyRemaining = null;
305 }
306
307 protected function log( $msg ) {
308 wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" );
309 }
310 }
311
312 class SquidPurgeClientPool {
313
314 /**
315 * @var array of SquidPurgeClient
316 */
317 var $clients = array();
318 var $timeout = 5;
319
320 function __construct( $options = array() ) {
321 if ( isset( $options['timeout'] ) ) {
322 $this->timeout = $options['timeout'];
323 }
324 }
325
326 /**
327 * @param $client SquidPurgeClient
328 * @return void
329 */
330 public function addClient( $client ) {
331 $this->clients[] = $client;
332 }
333
334 public function run() {
335 $done = false;
336 $startTime = microtime( true );
337 while ( !$done ) {
338 $readSockets = $writeSockets = array();
339 foreach ( $this->clients as $clientIndex => $client ) {
340 $sockets = $client->getReadSocketsForSelect();
341 foreach ( $sockets as $i => $socket ) {
342 $readSockets["$clientIndex/$i"] = $socket;
343 }
344 $sockets = $client->getWriteSocketsForSelect();
345 foreach ( $sockets as $i => $socket ) {
346 $writeSockets["$clientIndex/$i"] = $socket;
347 }
348 }
349 if ( !count( $readSockets ) && !count( $writeSockets ) ) {
350 break;
351 }
352 $exceptSockets = null;
353 $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
354 wfSuppressWarnings();
355 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
356 wfRestoreWarnings();
357 if ( $numReady === false ) {
358 wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' .
359 socket_strerror( socket_last_error() ) . "\n" );
360 break;
361 }
362 // Check for timeout, use 1% tolerance since we aimed at having socket_select()
363 // exit at precisely the overall timeout
364 if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
365 wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" );
366 break;
367 } elseif ( !$numReady ) {
368 continue;
369 }
370
371 foreach ( $readSockets as $key => $socket ) {
372 list( $clientIndex, ) = explode( '/', $key );
373 $client = $this->clients[$clientIndex];
374 $client->doReads();
375 }
376 foreach ( $writeSockets as $key => $socket ) {
377 list( $clientIndex, ) = explode( '/', $key );
378 $client = $this->clients[$clientIndex];
379 $client->doWrites();
380 }
381
382 $done = true;
383 foreach ( $this->clients as $client ) {
384 if ( !$client->isIdle() ) {
385 $done = false;
386 }
387 }
388 }
389 foreach ( $this->clients as $client ) {
390 $client->close();
391 }
392 }
393 }