Bug 35034 - moved autocomment-prefix between the prefix and the arrow. Follow up...
[lhc/web/wiklou.git] / includes / SquidPurgeClient.php
1 <?php
2 /**
3 * An HTTP 1.0 client built for the purposes of purging Squid and Varnish.
4 * Uses asynchronous I/O, allowing purges to be done in a highly parallel
5 * manner.
6 *
7 * Could be replaced by curl_multi_exec() or some such.
8 */
9 class SquidPurgeClient {
10 var $host, $port, $ip;
11
12 var $readState = 'idle';
13 var $writeBuffer = '';
14 var $requests = array();
15 var $currentRequestIndex;
16
17 const EINTR = 4;
18 const EAGAIN = 11;
19 const EINPROGRESS = 115;
20 const BUFFER_SIZE = 8192;
21
22 /**
23 * The socket resource, or null for unconnected, or false for disabled due to error
24 */
25 var $socket;
26
27 public function __construct( $server, $options = array() ) {
28 $parts = explode( ':', $server, 2 );
29 $this->host = $parts[0];
30 $this->port = isset( $parts[1] ) ? $parts[1] : 80;
31 }
32
33 /**
34 * Open a socket if there isn't one open already, return it.
35 * Returns false on error.
36 *
37 * @return bool|resource
38 */
39 protected function getSocket() {
40 if ( $this->socket !== null ) {
41 return $this->socket;
42 }
43
44 $ip = $this->getIP();
45 if ( !$ip ) {
46 $this->log( "DNS error" );
47 $this->markDown();
48 return false;
49 }
50 $this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
51 socket_set_nonblock( $this->socket );
52 wfSuppressWarnings();
53 $ok = socket_connect( $this->socket, $ip, $this->port );
54 wfRestoreWarnings();
55 if ( !$ok ) {
56 $error = socket_last_error( $this->socket );
57 if ( $error !== self::EINPROGRESS ) {
58 $this->log( "connection error: " . socket_strerror( $error ) );
59 $this->markDown();
60 return false;
61 }
62 }
63
64 return $this->socket;
65 }
66
67 /**
68 * Get read socket array for select()
69 * @return array
70 */
71 public function getReadSocketsForSelect() {
72 if ( $this->readState == 'idle' ) {
73 return array();
74 }
75 $socket = $this->getSocket();
76 if ( $socket === false ) {
77 return array();
78 }
79 return array( $socket );
80 }
81
82 /**
83 * Get write socket array for select()
84 * @return array
85 */
86 public function getWriteSocketsForSelect() {
87 if ( !strlen( $this->writeBuffer ) ) {
88 return array();
89 }
90 $socket = $this->getSocket();
91 if ( $socket === false ) {
92 return array();
93 }
94 return array( $socket );
95 }
96
97 /**
98 * Get the host's IP address.
99 * Does not support IPv6 at present due to the lack of a convenient interface in PHP.
100 */
101 protected function getIP() {
102 if ( $this->ip === null ) {
103 if ( IP::isIPv4( $this->host ) ) {
104 $this->ip = $this->host;
105 } elseif ( IP::isIPv6( $this->host ) ) {
106 throw new MWException( '$wgSquidServers does not support IPv6' );
107 } else {
108 wfSuppressWarnings();
109 $this->ip = gethostbyname( $this->host );
110 if ( $this->ip === $this->host ) {
111 $this->ip = false;
112 }
113 wfRestoreWarnings();
114 }
115 }
116 return $this->ip;
117 }
118
119 /**
120 * Close the socket and ignore any future purge requests.
121 * This is called if there is a protocol error.
122 */
123 protected function markDown() {
124 $this->close();
125 $this->socket = false;
126 }
127
128 /**
129 * Close the socket but allow it to be reopened for future purge requests
130 */
131 public function close() {
132 if ( $this->socket ) {
133 wfSuppressWarnings();
134 socket_set_block( $this->socket );
135 socket_shutdown( $this->socket );
136 socket_close( $this->socket );
137 wfRestoreWarnings();
138 }
139 $this->socket = null;
140 $this->readBuffer = '';
141 // Write buffer is kept since it may contain a request for the next socket
142 }
143
144 /**
145 * Queue a purge operation
146 *
147 * @param $url string
148 */
149 public function queuePurge( $url ) {
150 $url = SquidUpdate::expand( str_replace( "\n", '', $url ) );
151 $this->requests[] = "PURGE $url HTTP/1.0\r\n" .
152 "Connection: Keep-Alive\r\n" .
153 "Proxy-Connection: Keep-Alive\r\n" .
154 "User-Agent: " . Http::userAgent() . ' ' . __CLASS__ . "\r\n\r\n";
155 if ( $this->currentRequestIndex === null ) {
156 $this->nextRequest();
157 }
158 }
159
160 /**
161 * @return bool
162 */
163 public function isIdle() {
164 return strlen( $this->writeBuffer ) == 0 && $this->readState == 'idle';
165 }
166
167 /**
168 * Perform pending writes. Call this when socket_select() indicates that writing will not block.
169 */
170 public function doWrites() {
171 if ( !strlen( $this->writeBuffer ) ) {
172 return;
173 }
174 $socket = $this->getSocket();
175 if ( !$socket ) {
176 return;
177 }
178
179 if ( strlen( $this->writeBuffer ) <= self::BUFFER_SIZE ) {
180 $buf = $this->writeBuffer;
181 $flags = MSG_EOR;
182 } else {
183 $buf = substr( $this->writeBuffer, 0, self::BUFFER_SIZE );
184 $flags = 0;
185 }
186 wfSuppressWarnings();
187 $bytesSent = socket_send( $socket, $buf, strlen( $buf ), $flags );
188 wfRestoreWarnings();
189
190 if ( $bytesSent === false ) {
191 $error = socket_last_error( $socket );
192 if ( $error != self::EAGAIN && $error != self::EINTR ) {
193 $this->log( 'write error: ' . socket_strerror( $error ) );
194 $this->markDown();
195 }
196 return;
197 }
198
199 $this->writeBuffer = substr( $this->writeBuffer, $bytesSent );
200 }
201
202 /**
203 * Read some data. Call this when socket_select() indicates that the read buffer is non-empty.
204 */
205 public function doReads() {
206 $socket = $this->getSocket();
207 if ( !$socket ) {
208 return;
209 }
210
211 $buf = '';
212 wfSuppressWarnings();
213 $bytesRead = socket_recv( $socket, $buf, self::BUFFER_SIZE, 0 );
214 wfRestoreWarnings();
215 if ( $bytesRead === false ) {
216 $error = socket_last_error( $socket );
217 if ( $error != self::EAGAIN && $error != self::EINTR ) {
218 $this->log( 'read error: ' . socket_strerror( $error ) );
219 $this->markDown();
220 return;
221 }
222 } elseif ( $bytesRead === 0 ) {
223 // Assume EOF
224 $this->close();
225 return;
226 }
227
228 $this->readBuffer .= $buf;
229 while ( $this->socket && $this->processReadBuffer() === 'continue' );
230 }
231
232 /**
233 * @throws MWException
234 * @return string
235 */
236 protected function processReadBuffer() {
237 switch ( $this->readState ) {
238 case 'idle':
239 return 'done';
240 case 'status':
241 case 'header':
242 $lines = explode( "\r\n", $this->readBuffer, 2 );
243 if ( count( $lines ) < 2 ) {
244 return 'done';
245 }
246 if ( $this->readState == 'status' ) {
247 $this->processStatusLine( $lines[0] );
248 } else { // header
249 $this->processHeaderLine( $lines[0] );
250 }
251 $this->readBuffer = $lines[1];
252 return 'continue';
253 case 'body':
254 if ( $this->bodyRemaining !== null ) {
255 if ( $this->bodyRemaining > strlen( $this->readBuffer ) ) {
256 $this->bodyRemaining -= strlen( $this->readBuffer );
257 $this->readBuffer = '';
258 return 'done';
259 } else {
260 $this->readBuffer = substr( $this->readBuffer, $this->bodyRemaining );
261 $this->bodyRemaining = 0;
262 $this->nextRequest();
263 return 'continue';
264 }
265 } else {
266 // No content length, read all data to EOF
267 $this->readBuffer = '';
268 return 'done';
269 }
270 default:
271 throw new MWException( __METHOD__.': unexpected state' );
272 }
273 }
274
275 /**
276 * @param $line
277 * @return
278 */
279 protected function processStatusLine( $line ) {
280 if ( !preg_match( '!^HTTP/(\d+)\.(\d+) (\d{3}) (.*)$!', $line, $m ) ) {
281 $this->log( 'invalid status line' );
282 $this->markDown();
283 return;
284 }
285 list( , , , $status, $reason ) = $m;
286 $status = intval( $status );
287 if ( $status !== 200 && $status !== 404 ) {
288 $this->log( "unexpected status code: $status $reason" );
289 $this->markDown();
290 return;
291 }
292 $this->readState = 'header';
293 }
294
295 /**
296 * @param $line string
297 */
298 protected function processHeaderLine( $line ) {
299 if ( preg_match( '/^Content-Length: (\d+)$/i', $line, $m ) ) {
300 $this->bodyRemaining = intval( $m[1] );
301 } elseif ( $line === '' ) {
302 $this->readState = 'body';
303 }
304 }
305
306 protected function nextRequest() {
307 if ( $this->currentRequestIndex !== null ) {
308 unset( $this->requests[$this->currentRequestIndex] );
309 }
310 if ( count( $this->requests ) ) {
311 $this->readState = 'status';
312 $this->currentRequestIndex = key( $this->requests );
313 $this->writeBuffer = $this->requests[$this->currentRequestIndex];
314 } else {
315 $this->readState = 'idle';
316 $this->currentRequestIndex = null;
317 $this->writeBuffer = '';
318 }
319 $this->bodyRemaining = null;
320 }
321
322 protected function log( $msg ) {
323 wfDebugLog( 'squid', __CLASS__." ($this->host): $msg\n" );
324 }
325 }
326
327 class SquidPurgeClientPool {
328
329 /**
330 * @var array of SquidPurgeClient
331 */
332 var $clients = array();
333 var $timeout = 5;
334
335 function __construct( $options = array() ) {
336 if ( isset( $options['timeout'] ) ) {
337 $this->timeout = $options['timeout'];
338 }
339 }
340
341 /**
342 * @param $client SquidPurgeClient
343 * @return void
344 */
345 public function addClient( $client ) {
346 $this->clients[] = $client;
347 }
348
349 public function run() {
350 $done = false;
351 $startTime = microtime( true );
352 while ( !$done ) {
353 $readSockets = $writeSockets = array();
354 foreach ( $this->clients as $clientIndex => $client ) {
355 $sockets = $client->getReadSocketsForSelect();
356 foreach ( $sockets as $i => $socket ) {
357 $readSockets["$clientIndex/$i"] = $socket;
358 }
359 $sockets = $client->getWriteSocketsForSelect();
360 foreach ( $sockets as $i => $socket ) {
361 $writeSockets["$clientIndex/$i"] = $socket;
362 }
363 }
364 if ( !count( $readSockets ) && !count( $writeSockets ) ) {
365 break;
366 }
367 $exceptSockets = null;
368 $timeout = min( $startTime + $this->timeout - microtime( true ), 1 );
369 wfSuppressWarnings();
370 $numReady = socket_select( $readSockets, $writeSockets, $exceptSockets, $timeout );
371 wfRestoreWarnings();
372 if ( $numReady === false ) {
373 wfDebugLog( 'squid', __METHOD__.': Error in stream_select: ' .
374 socket_strerror( socket_last_error() ) . "\n" );
375 break;
376 }
377 // Check for timeout, use 1% tolerance since we aimed at having socket_select()
378 // exit at precisely the overall timeout
379 if ( microtime( true ) - $startTime > $this->timeout * 0.99 ) {
380 wfDebugLog( 'squid', __CLASS__.": timeout ({$this->timeout}s)\n" );
381 break;
382 } elseif ( !$numReady ) {
383 continue;
384 }
385
386 foreach ( $readSockets as $key => $socket ) {
387 list( $clientIndex, ) = explode( '/', $key );
388 $client = $this->clients[$clientIndex];
389 $client->doReads();
390 }
391 foreach ( $writeSockets as $key => $socket ) {
392 list( $clientIndex, ) = explode( '/', $key );
393 $client = $this->clients[$clientIndex];
394 $client->doWrites();
395 }
396
397 $done = true;
398 foreach ( $this->clients as $client ) {
399 if ( !$client->isIdle() ) {
400 $done = false;
401 }
402 }
403 }
404 foreach ( $this->clients as $client ) {
405 $client->close();
406 }
407 }
408 }