Remove wfDie() that Ariel keeps trying to resurrect :)
[lhc/web/wiklou.git] / maintenance / dumpTextPass.php
1 <?php
2 /**
3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
4 *
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
22 *
23 * @file
24 * @ingroup Maintenance
25 */
26
27 $originalDir = getcwd();
28
29 require_once( dirname( __FILE__ ) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
31
32 /**
33 * @ingroup Maintenance
34 */
35 class TextPassDumper extends BackupDumper {
36 var $prefetch = null;
37 var $input = "php://stdin";
38 var $history = WikiExporter::FULL;
39 var $fetchCount = 0;
40 var $prefetchCount = 0;
41 var $prefetchCountLast = 0;
42 var $fetchCountLast = 0;
43
44 var $failures = 0;
45 var $maxFailures = 5;
46 var $failedTextRetrievals = 0;
47 var $maxConsecutiveFailedTextRetrievals = 200;
48 var $failureTimeout = 5; // Seconds to sleep after db failure
49
50 var $php = "php";
51 var $spawn = false;
52 var $spawnProc = false;
53 var $spawnWrite = false;
54 var $spawnRead = false;
55 var $spawnErr = false;
56
57 var $xmlwriterobj = false;
58
59 # when we spend more than maxTimeAllowed seconds on this run, we continue
60 # processing until we write out the next complete page, then save output file(s),
61 # rename it/them and open new one(s)
62 var $maxTimeAllowed = 0; // 0 = no limit
63 var $timeExceeded = false;
64 var $firstPageWritten = false;
65 var $lastPageWritten = false;
66 var $checkpointJustWritten = false;
67 var $checkpointFiles = array();
68
69 function initProgress( $history ) {
70 parent::initProgress();
71 $this->timeOfCheckpoint = $this->startTime;
72 }
73
74 function dump( $history, $text = WikiExporter::TEXT ) {
75 # This shouldn't happen if on console... ;)
76 header( 'Content-type: text/html; charset=UTF-8' );
77
78 # Notice messages will foul up your XML output even if they're
79 # relatively harmless.
80 if ( ini_get( 'display_errors' ) )
81 ini_set( 'display_errors', 'stderr' );
82
83 $this->initProgress( $this->history );
84
85 $this->db = $this->backupDb();
86
87 $this->egress = new ExportProgressFilter( $this->sink, $this );
88
89 # it would be nice to do it in the constructor, oh well. need egress set
90 $this->finalOptionCheck();
91
92 # we only want this so we know how to close a stream :-P
93 $this->xmlwriterobj = new XmlDumpWriter();
94
95 $input = fopen( $this->input, "rt" );
96 $result = $this->readDump( $input );
97
98 if ( WikiError::isError( $result ) ) {
99 throw new MWException( $result->getMessage() );
100 }
101
102 if ( $this->spawnProc ) {
103 $this->closeSpawn();
104 }
105
106 $this->report( true );
107 }
108
109 function processOption( $opt, $val, $param ) {
110 global $IP;
111 $url = $this->processFileOpt( $val, $param );
112
113 switch( $opt ) {
114 case 'prefetch':
115 require_once "$IP/maintenance/backupPrefetch.inc";
116 $this->prefetch = new BaseDump( $url );
117 break;
118 case 'stub':
119 $this->input = $url;
120 break;
121 case 'maxtime':
122 $this->maxTimeAllowed = intval($val)*60;
123 break;
124 case 'checkpointfile':
125 $this->checkpointFiles[] = $val;
126 break;
127 case 'current':
128 $this->history = WikiExporter::CURRENT;
129 break;
130 case 'full':
131 $this->history = WikiExporter::FULL;
132 break;
133 case 'spawn':
134 $this->spawn = true;
135 if ( $val ) {
136 $this->php = $val;
137 }
138 break;
139 }
140 }
141
142 function processFileOpt( $val, $param ) {
143 $fileURIs = explode(';',$param);
144 foreach ( $fileURIs as $URI ) {
145 switch( $val ) {
146 case "file":
147 $newURI = $URI;
148 break;
149 case "gzip":
150 $newURI = "compress.zlib://$URI";
151 break;
152 case "bzip2":
153 $newURI = "compress.bzip2://$URI";
154 break;
155 case "7zip":
156 $newURI = "mediawiki.compress.7z://$URI";
157 break;
158 default:
159 $newURI = $URI;
160 }
161 $newFileURIs[] = $newURI;
162 }
163 $val = implode( ';', $newFileURIs );
164 return $val;
165 }
166
167 /**
168 * Overridden to include prefetch ratio if enabled.
169 */
170 function showReport() {
171 if ( !$this->prefetch ) {
172 return parent::showReport();
173 }
174
175 if ( $this->reporting ) {
176 $now = wfTimestamp( TS_DB );
177 $nowts = wfTime();
178 $deltaAll = wfTime() - $this->startTime;
179 $deltaPart = wfTime() - $this->lastTime;
180 $this->pageCountPart = $this->pageCount - $this->pageCountLast;
181 $this->revCountPart = $this->revCount - $this->revCountLast;
182
183 if ( $deltaAll ) {
184 $portion = $this->revCount / $this->maxCount;
185 $eta = $this->startTime + $deltaAll / $portion;
186 $etats = wfTimestamp( TS_DB, intval( $eta ) );
187 if ( $this->fetchCount ) {
188 $fetchRate = 100.0 * $this->prefetchCount / $this->fetchCount;
189 }
190 else {
191 $fetchRate = '-';
192 }
193 $pageRate = $this->pageCount / $deltaAll;
194 $revRate = $this->revCount / $deltaAll;
195 } else {
196 $pageRate = '-';
197 $revRate = '-';
198 $etats = '-';
199 $fetchRate = '-';
200 }
201 if ( $deltaPart ) {
202 if ( $this->fetchCountLast ) {
203 $fetchRatePart = 100.0 * $this->prefetchCountLast / $this->fetchCountLast;
204 }
205 else {
206 $fetchRatePart = '-';
207 }
208 $pageRatePart = $this->pageCountPart / $deltaPart;
209 $revRatePart = $this->revCountPart / $deltaPart;
210
211 } else {
212 $fetchRatePart = '-';
213 $pageRatePart = '-';
214 $revRatePart = '-';
215 }
216 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",
217 $now, wfWikiID(), $this->ID, $this->pageCount, $pageRate, $pageRatePart, $this->revCount, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount ) );
218 $this->lastTime = $nowts;
219 $this->revCountLast = $this->revCount;
220 $this->prefetchCountLast = $this->prefetchCount;
221 $this->fetchCountLast = $this->fetchCount;
222 }
223 }
224
225 function setTimeExceeded() {
226 $this->timeExceeded = True;
227 }
228
229 function checkIfTimeExceeded() {
230 if ( $this->maxTimeAllowed && ( $this->lastTime - $this->timeOfCheckpoint > $this->maxTimeAllowed ) ) {
231 return True;
232 }
233 return False;
234 }
235
236 function finalOptionCheck() {
237 if (($this->checkpointFiles && ! $this->maxTimeAllowed) ||
238 ($this->maxTimeAllowed && !$this->checkpointFiles)) {
239 throw new MWException("Options checkpointfile and maxtime must be specified together.\n");
240 }
241 foreach ($this->checkpointFiles as $checkpointFile) {
242 $count = substr_count ($checkpointFile,"%s");
243 if (substr_count ($checkpointFile,"%s") != 2) {
244 throw new MWException("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
245 }
246 }
247
248 if ($this->checkpointFiles) {
249 $filenameList = $this->egress->getFilename();
250 if (! is_array($filenameList)) {
251 $filenameList = array( $filenameList );
252 }
253 if (count($filenameList) != count($this->checkpointFiles)) {
254 throw new MWException("One checkpointfile must be specified for each output option, if maxtime is used.\n");
255 }
256 }
257 }
258
259 function readDump( $input ) {
260 $this->buffer = "";
261 $this->openElement = false;
262 $this->atStart = true;
263 $this->state = "";
264 $this->lastName = "";
265 $this->thisPage = 0;
266 $this->thisRev = 0;
267
268 $parser = xml_parser_create( "UTF-8" );
269 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING, false );
270
271 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
272 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
273
274 $offset = 0; // for context extraction on error reporting
275 $bufferSize = 512 * 1024;
276 do {
277 if ($this->checkIfTimeExceeded()) {
278 $this->setTimeExceeded();
279 }
280 $chunk = fread( $input, $bufferSize );
281 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
282 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
283 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
284 }
285 $offset += strlen( $chunk );
286 } while ( $chunk !== false && !feof( $input ) );
287 if ($this->maxTimeAllowed) {
288 $filenameList = $this->egress->getFilename();
289 # we wrote some stuff after last checkpoint that needs renamed */
290 if (! is_array($filenameList)) {
291 $filenameList = array( $filenameList );
292 }
293 if (file_exists($filenameList[0])) {
294 $newFilenames = array();
295 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
296 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
297 for ($i =0; $i < count($filenameList); $i++) {
298 $checkpointNameFilledIn = sprintf($this->checkpointFiles[$i], $firstPageID, $lastPageID);
299 $fileinfo = pathinfo($filenameList[$i]);
300 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
301 }
302 $this->egress->closeAndRename( $newFilenames );
303 }
304 }
305 xml_parser_free( $parser );
306
307 return true;
308 }
309
310 function getText( $id ) {
311 $this->fetchCount++;
312 if ( isset( $this->prefetch ) ) {
313 $text = $this->prefetch->prefetch( $this->thisPage, $this->thisRev );
314 if ( $text !== null ) { // Entry missing from prefetch dump
315 $dbr = wfGetDB( DB_SLAVE );
316 $revID = intval( $this->thisRev );
317 $revLength = $dbr->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
318 // if length of rev text in file doesn't match length in db, we reload
319 // this avoids carrying forward broken data from previous xml dumps
320 if( strlen( $text ) == $revLength ) {
321 $this->prefetchCount++;
322 return $text;
323 }
324 }
325 }
326 return $this->doGetText( $id );
327 }
328
329 private function doGetText( $id ) {
330
331 $id = intval( $id );
332 $this->failures = 0;
333 $ex = new MWException( "Graceful storage failure" );
334 while (true) {
335 if ( $this->spawn ) {
336 if ($this->failures) {
337 // we don't know why it failed, could be the child process
338 // borked, could be db entry busted, could be db server out to lunch,
339 // so cover all bases
340 $this->closeSpawn();
341 $this->openSpawn();
342 }
343 $text = $this->getTextSpawned( $id );
344 } else {
345 $text = $this->getTextDbSafe( $id );
346 }
347 if ( $text === false ) {
348 $this->failures++;
349 if ( $this->failures > $this->maxFailures) {
350 $this->progress( "Failed to retrieve revision text for text id ".
351 "$id after $this->maxFailures tries, giving up" );
352 // were there so many bad retrievals in a row we want to bail?
353 // at some point we have to declare the dump irretrievably broken
354 $this->failedTextRetrievals++;
355 if ($this->failedTextRetrievals > $this->maxConsecutiveFailedTextRetrievals) {
356 throw $ex;
357 }
358 else {
359 // would be nice to return something better to the caller someday,
360 // log what we know about the failure and about the revision
361 return("");
362 }
363 } else {
364 $this->progress( "Error $this->failures " .
365 "of allowed $this->maxFailures retrieving revision text for text id $id! " .
366 "Pausing $this->failureTimeout seconds before retry..." );
367 sleep( $this->failureTimeout );
368 }
369 } else {
370 $this->failedTextRetrievals= 0;
371 return( $text );
372 }
373 }
374
375 }
376
377 /**
378 * Fetch a text revision from the database, retrying in case of failure.
379 * This may survive some transitory errors by reconnecting, but
380 * may not survive a long-term server outage.
381 */
382 private function getTextDbSafe( $id ) {
383 while ( true ) {
384 try {
385 $text = $this->getTextDb( $id );
386 } catch ( DBQueryError $ex ) {
387 $text = false;
388 }
389 return $text;
390 }
391 }
392
393 /**
394 * May throw a database error if, say, the server dies during query.
395 */
396 private function getTextDb( $id ) {
397 global $wgContLang;
398 $row = $this->db->selectRow( 'text',
399 array( 'old_text', 'old_flags' ),
400 array( 'old_id' => $id ),
401 __METHOD__ );
402 $text = Revision::getRevisionText( $row );
403 if ( $text === false ) {
404 return false;
405 }
406 $stripped = str_replace( "\r", "", $text );
407 $normalized = $wgContLang->normalize( $stripped );
408 return $normalized;
409 }
410
411 private function getTextSpawned( $id ) {
412 wfSuppressWarnings();
413 if ( !$this->spawnProc ) {
414 // First time?
415 $this->openSpawn();
416 }
417 $text = $this->getTextSpawnedOnce( $id );
418 wfRestoreWarnings();
419 return $text;
420 }
421
422 function openSpawn() {
423 global $IP;
424
425 $cmd = implode( " ",
426 array_map( 'wfEscapeShellArg',
427 array(
428 $this->php,
429 "$IP/maintenance/fetchText.php",
430 '--wiki', wfWikiID() ) ) );
431 $spec = array(
432 0 => array( "pipe", "r" ),
433 1 => array( "pipe", "w" ),
434 2 => array( "file", "/dev/null", "a" ) );
435 $pipes = array();
436
437 $this->progress( "Spawning database subprocess: $cmd" );
438 $this->spawnProc = proc_open( $cmd, $spec, $pipes );
439 if ( !$this->spawnProc ) {
440 // shit
441 $this->progress( "Subprocess spawn failed." );
442 return false;
443 }
444 list(
445 $this->spawnWrite, // -> stdin
446 $this->spawnRead, // <- stdout
447 ) = $pipes;
448
449 return true;
450 }
451
452 private function closeSpawn() {
453 wfSuppressWarnings();
454 if ( $this->spawnRead )
455 fclose( $this->spawnRead );
456 $this->spawnRead = false;
457 if ( $this->spawnWrite )
458 fclose( $this->spawnWrite );
459 $this->spawnWrite = false;
460 if ( $this->spawnErr )
461 fclose( $this->spawnErr );
462 $this->spawnErr = false;
463 if ( $this->spawnProc )
464 pclose( $this->spawnProc );
465 $this->spawnProc = false;
466 wfRestoreWarnings();
467 }
468
469 private function getTextSpawnedOnce( $id ) {
470 global $wgContLang;
471
472 $ok = fwrite( $this->spawnWrite, "$id\n" );
473 // $this->progress( ">> $id" );
474 if ( !$ok ) return false;
475
476 $ok = fflush( $this->spawnWrite );
477 // $this->progress( ">> [flush]" );
478 if ( !$ok ) return false;
479
480 // check that the text id they are sending is the one we asked for
481 // this avoids out of sync revision text errors we have encountered in the past
482 $newId = fgets( $this->spawnRead );
483 if ( $newId === false ) {
484 return false;
485 }
486 if ( $id != intval( $newId ) ) {
487 return false;
488 }
489
490 $len = fgets( $this->spawnRead );
491 // $this->progress( "<< " . trim( $len ) );
492 if ( $len === false ) return false;
493
494 $nbytes = intval( $len );
495 // actual error, not zero-length text
496 if ($nbytes < 0 ) return false;
497
498 $text = "";
499
500 // Subprocess may not send everything at once, we have to loop.
501 while ( $nbytes > strlen( $text ) ) {
502 $buffer = fread( $this->spawnRead, $nbytes - strlen( $text ) );
503 if ( $buffer === false ) break;
504 $text .= $buffer;
505 }
506
507 $gotbytes = strlen( $text );
508 if ( $gotbytes != $nbytes ) {
509 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
510 return false;
511 }
512
513 // Do normalization in the dump thread...
514 $stripped = str_replace( "\r", "", $text );
515 $normalized = $wgContLang->normalize( $stripped );
516 return $normalized;
517 }
518
519 function startElement( $parser, $name, $attribs ) {
520 $this->checkpointJustWritten = false;
521
522 $this->clearOpenElement( null );
523 $this->lastName = $name;
524
525 if ( $name == 'revision' ) {
526 $this->state = $name;
527 $this->egress->writeOpenPage( null, $this->buffer );
528 $this->buffer = "";
529 } elseif ( $name == 'page' ) {
530 $this->state = $name;
531 if ( $this->atStart ) {
532 $this->egress->writeOpenStream( $this->buffer );
533 $this->buffer = "";
534 $this->atStart = false;
535 }
536 }
537
538 if ( $name == "text" && isset( $attribs['id'] ) ) {
539 $text = $this->getText( $attribs['id'] );
540 $this->openElement = array( $name, array( 'xml:space' => 'preserve' ) );
541 if ( strlen( $text ) > 0 ) {
542 $this->characterData( $parser, $text );
543 }
544 } else {
545 $this->openElement = array( $name, $attribs );
546 }
547 }
548
549 function endElement( $parser, $name ) {
550 $this->checkpointJustWritten = false;
551
552 if ( $this->openElement ) {
553 $this->clearOpenElement( "" );
554 } else {
555 $this->buffer .= "</$name>";
556 }
557
558 if ( $name == 'revision' ) {
559 $this->egress->writeRevision( null, $this->buffer );
560 $this->buffer = "";
561 $this->thisRev = "";
562 } elseif ( $name == 'page' ) {
563 if (! $this->firstPageWritten) {
564 $this->firstPageWritten = trim($this->thisPage);
565 }
566 $this->lastPageWritten = trim($this->thisPage);
567 if ($this->timeExceeded) {
568 $this->egress->writeClosePage( $this->buffer );
569 # nasty hack, we can't just write the chardata after the
570 # page tag, it will include leading blanks from the next line
571 $this->egress->sink->write("\n");
572
573 $this->buffer = $this->xmlwriterobj->closeStream();
574 $this->egress->writeCloseStream( $this->buffer );
575
576 $this->buffer = "";
577 $this->thisPage = "";
578 /* this could be more than one file if we had more than one output arg */
579 $checkpointFilenames = array();
580 $filenameList = $this->egress->getFilename();
581
582 if (! is_array($filenameList)) {
583 $filenameList = array( $filenameList );
584 }
585 $newFilenames = array();
586 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
587 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
588 for ($i =0; $i < count($filenameList); $i++) {
589 $checkpointNameFilledIn = sprintf($this->checkpointFiles[$i], $firstPageID, $lastPageID);
590 $fileinfo = pathinfo($filenameList[$i]);
591 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
592 }
593 $this->egress->closeRenameAndReopen( $newFilenames );
594 $this->buffer = $this->xmlwriterobj->openStream();
595 $this->timeExceeded = false;
596 $this->timeOfCheckpoint = $this->lastTime;
597 $this->firstPageWritten = false;
598 $this->checkpointJustWritten = true;
599 }
600 else {
601 $this->egress->writeClosePage( $this->buffer );
602 $this->buffer = "";
603 $this->thisPage = "";
604 }
605
606 } elseif ( $name == 'mediawiki' ) {
607 $this->egress->writeCloseStream( $this->buffer );
608 $this->buffer = "";
609 }
610 }
611
612 function characterData( $parser, $data ) {
613 $this->clearOpenElement( null );
614 if ( $this->lastName == "id" ) {
615 if ( $this->state == "revision" ) {
616 $this->thisRev .= $data;
617 } elseif ( $this->state == "page" ) {
618 $this->thisPage .= $data;
619 }
620 }
621 # have to skip the newline left over from closepagetag line of
622 # end of checkpoint files. nasty hack!!
623 if ($this->checkpointJustWritten) {
624 if ($data[0] == "\n") {
625 $data = substr($data,1);
626 }
627 $this->checkpointJustWritten = false;
628 }
629 $this->buffer .= htmlspecialchars( $data );
630 }
631
632 function clearOpenElement( $style ) {
633 if ( $this->openElement ) {
634 $this->buffer .= Xml::element( $this->openElement[0], $this->openElement[1], $style );
635 $this->openElement = false;
636 }
637 }
638 }
639
640
641 $dumper = new TextPassDumper( $argv );
642
643 if ( !isset( $options['help'] ) ) {
644 $dumper->dump( true );
645 } else {
646 $dumper->progress( <<<ENDS
647 This script postprocesses XML dumps from dumpBackup.php to add
648 page text which was stubbed out (using --stub).
649
650 XML input is accepted on stdin.
651 XML output is sent to stdout; progress reports are sent to stderr.
652
653 Usage: php dumpTextPass.php [<options>]
654 Options:
655 --stub=<type>:<file> To load a compressed stub dump instead of stdin
656 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
657 pressure on the database.
658 (Requires the XMLReader extension)
659 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
660 out complete page, closing xml file properly, and opening new one
661 with header). This option requires the checkpointfile option.
662 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
663 substituting first pageid written for the first %s (required) and the
664 last pageid written for the second %s if it exists.
665 --quiet Don't dump status reports to stderr.
666 --report=n Report position and speed after every n pages processed.
667 (Default: 100)
668 --server=h Force reading from MySQL server h
669 --current Base ETA on number of pages in database instead of all revisions
670 --spawn Spawn a subprocess for loading text records
671 --help Display this help message
672 ENDS
673 );
674 }
675
676