add support for writing out checkpoint files of xml dump at regular intervals (close...
[lhc/web/wiklou.git] / maintenance / dumpTextPass.php
1 <?php
2 /**
3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
4 *
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
22 *
23 * @file
24 * @ingroup Maintenance
25 */
26
27 $originalDir = getcwd();
28
29 require_once( dirname( __FILE__ ) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
31
32 /**
33 * @ingroup Maintenance
34 */
35 class TextPassDumper extends BackupDumper {
36 var $prefetch = null;
37 var $input = "php://stdin";
38 var $history = WikiExporter::FULL;
39 var $fetchCount = 0;
40 var $prefetchCount = 0;
41 var $lastTime = 0;
42 var $pageCountLast = 0;
43 var $revCountLast = 0;
44 var $prefetchCountLast = 0;
45 var $fetchCountLast = 0;
46
47 var $failures = 0;
48 var $maxFailures = 5;
49 var $failedTextRetrievals = 0;
50 var $maxConsecutiveFailedTextRetrievals = 200;
51 var $failureTimeout = 5; // Seconds to sleep after db failure
52
53 var $php = "php";
54 var $spawn = false;
55 var $spawnProc = false;
56 var $spawnWrite = false;
57 var $spawnRead = false;
58 var $spawnErr = false;
59
60 var $ID = 0;
61
62 var $xmlwriterobj = false;
63
64 # when we spend more than maxTimeAllowed seconds on this run, we continue
65 # processing until we write out the next complete page, then save output file(s),
66 # rename it/them and open new one(s)
67 var $maxTimeAllowed = 0; // 0 = no limit
68 var $timeExceeded = false;
69 var $firstPageWritten = false;
70 var $lastPageWritten = false;
71 var $checkpointJustWritten = false;
72 var $checkpointFiles = array();
73
74 function initProgress( $history ) {
75 parent::initProgress();
76 $this->ID = getmypid();
77 $this->lastTime = $this->startTime;
78 $this->timeOfCheckpoint = $this->startTime;
79 }
80
81 function dump( $history, $text = WikiExporter::TEXT ) {
82 # This shouldn't happen if on console... ;)
83 header( 'Content-type: text/html; charset=UTF-8' );
84
85 # Notice messages will foul up your XML output even if they're
86 # relatively harmless.
87 if ( ini_get( 'display_errors' ) )
88 ini_set( 'display_errors', 'stderr' );
89
90 $this->initProgress( $this->history );
91
92 $this->db = $this->backupDb();
93
94 $this->egress = new ExportProgressFilter( $this->sink, $this );
95
96 # it would be nice to do it in the constructor, oh well. need egress set
97 $this->finalOptionCheck();
98
99 # we only want this so we know how to close a stream :-P
100 $this->xmlwriterobj = new XmlDumpWriter();
101
102 $input = fopen( $this->input, "rt" );
103 $result = $this->readDump( $input );
104
105 if ( WikiError::isError( $result ) ) {
106 wfDie( $result->getMessage() );
107 }
108
109 if ( $this->spawnProc ) {
110 $this->closeSpawn();
111 }
112
113 $this->report( true );
114 }
115
116 function processOption( $opt, $val, $param ) {
117 global $IP;
118 $url = $this->processFileOpt( $val, $param );
119
120 switch( $opt ) {
121 case 'prefetch':
122 require_once "$IP/maintenance/backupPrefetch.inc";
123 $this->prefetch = new BaseDump( $url );
124 break;
125 case 'stub':
126 $this->input = $url;
127 break;
128 case 'maxtime':
129 $this->maxTimeAllowed = intval($val)*60;
130 break;
131 case 'checkpointfile':
132 $this->checkpointFiles[] = $val;
133 break;
134 case 'current':
135 $this->history = WikiExporter::CURRENT;
136 break;
137 case 'full':
138 $this->history = WikiExporter::FULL;
139 break;
140 case 'spawn':
141 $this->spawn = true;
142 if ( $val ) {
143 $this->php = $val;
144 }
145 break;
146 }
147 }
148
149 function processFileOpt( $val, $param ) {
150 $fileURIs = explode(';',$param);
151 foreach ( $fileURIs as $URI ) {
152 switch( $val ) {
153 case "file":
154 $newURI = $URI;
155 break;
156 case "gzip":
157 $newURI = "compress.zlib://$URI";
158 break;
159 case "bzip2":
160 $newURI = "compress.bzip2://$URI";
161 break;
162 case "7zip":
163 $newURI = "mediawiki.compress.7z://$URI";
164 break;
165 default:
166 $newURI = $URI;
167 }
168 $newFileURIs[] = $newURI;
169 }
170 $val = implode( ';', $newFileURIs );
171 return $val;
172 }
173
174 /**
175 * Overridden to include prefetch ratio if enabled.
176 */
177 function showReport() {
178 if ( !$this->prefetch ) {
179 return parent::showReport();
180 }
181
182 if ( $this->reporting ) {
183 $now = wfTimestamp( TS_DB );
184 $deltaAll = wfTime() - $this->startTime;
185 $deltaPart = wfTime() - $this->lastTime;
186 $this->pageCountPart = $this->pageCount - $this->pageCountLast;
187 $this->revCountPart = $this->revCount - $this->revCountLast;
188
189 if ( $deltaAll ) {
190 $portion = $this->revCount / $this->maxCount;
191 $eta = $this->startTime + $deltaAll / $portion;
192 $etats = wfTimestamp( TS_DB, intval( $eta ) );
193 if ( $this->fetchCount ) {
194 $fetchRate = 100.0 * $this->prefetchCount / $this->fetchCount;
195 }
196 else {
197 $fetchRate = '-';
198 }
199 $pageRate = $this->pageCount / $deltaAll;
200 $revRate = $this->revCount / $deltaAll;
201 } else {
202 $pageRate = '-';
203 $revRate = '-';
204 $etats = '-';
205 $fetchRate = '-';
206 }
207 if ( $deltaPart ) {
208 if ( $this->fetchCountLast ) {
209 $fetchRatePart = 100.0 * $this->prefetchCountLast / $this->fetchCountLast;
210 }
211 else {
212 $fetchRatePart = '-';
213 }
214 $pageRatePart = $this->pageCountPart / $deltaPart;
215 $revRatePart = $this->revCountPart / $deltaPart;
216
217 } else {
218 $fetchRatePart = '-';
219 $pageRatePart = '-';
220 $revRatePart = '-';
221 }
222 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",-
223 $now, wfWikiID(), $this->ID, $this->pageCount, $pageRate, $pageRatePart, $this->revCount, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount ) );
224 $this->lastTime = $now;
225 $this->partCountLast = $this->partCount;
226 $this->revCountLast = $this->revCount;
227 $this->prefetchCountLast = $this->prefetchCount;
228 $this->fetchCountLast = $this->fetchCount;
229 }
230 }
231
232 function setTimeExceeded() {
233 $this->timeExceeded = True;
234 }
235
236 function checkIfTimeExceeded() {
237 if ( $this->maxTimeAllowed && ( $this->lastTime - $this->timeOfCheckpoint > $this->maxTimeAllowed ) ) {
238 return True;
239 }
240 return False;
241 }
242
243 function finalOptionCheck() {
244 if (($this->checkpointFiles && ! $this->maxTimeAllowed) ||
245 ($this->maxTimeAllowed && !$this->checkpointFiles)) {
246 wfDie("Options checkpointfile and maxtime must be specified together.\n");
247 }
248 foreach ($this->checkpointFiles as $checkpointFile) {
249 $count = substr_count ($checkpointFile,"%s");
250 if (substr_count ($checkpointFile,"%s") != 2) {
251 wfDie("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, fil
252 e is $checkpointFile.\n");
253 }
254 }
255
256 $filenameList = $this->egress->getFilename();
257 if (! is_array($filenameList)) {
258 $filenameList = array( $filenameList );
259 }
260 if (count($filenameList) != count($this->checkpointFiles)) {
261 wfDie("One checkpointfile must be specified for each output option, if maxtime is used.\n");
262 }
263 }
264
265 function readDump( $input ) {
266 $this->buffer = "";
267 $this->openElement = false;
268 $this->atStart = true;
269 $this->state = "";
270 $this->lastName = "";
271 $this->thisPage = 0;
272 $this->thisRev = 0;
273
274 $parser = xml_parser_create( "UTF-8" );
275 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING, false );
276
277 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
278 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
279
280 $offset = 0; // for context extraction on error reporting
281 $bufferSize = 512 * 1024;
282 do {
283 if ($this->checkIfTimeExceeded()) {
284 $this->setTimeExceeded();
285 }
286 $chunk = fread( $input, $bufferSize );
287 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
288 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
289 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
290 }
291 $offset += strlen( $chunk );
292 } while ( $chunk !== false && !feof( $input ) );
293 if ($this->maxTimeAllowed) {
294 $filenameList = $this->egress->getFilename();
295 # we wrote some stuff after last checkpoint that needs renamed */
296 if (! is_array($filenameList)) {
297 $filenameList = array( $filenameList );
298 }
299 if (file_exists($filenameList[0])) {
300 $newFilenames = array();
301 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
302 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
303 for ($i =0; $i < count($filenameList); $i++) {
304 $checkpointNameFilledIn = sprintf($this->checkpointFiles[$i], $firstPageID, $lastPageID);
305 $fileinfo = pathinfo($filenameList[$i]);
306 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
307 }
308 $this->egress->rename( $newFilenames );
309 }
310 }
311 xml_parser_free( $parser );
312
313 return true;
314 }
315
316 function getText( $id ) {
317 $this->fetchCount++;
318 if ( isset( $this->prefetch ) ) {
319 $text = $this->prefetch->prefetch( $this->thisPage, $this->thisRev );
320 if ( $text !== null ) { // Entry missing from prefetch dump
321 $dbr = wfGetDB( DB_SLAVE );
322 $revID = intval( $this->thisRev );
323 $revLength = $dbr->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
324 // if length of rev text in file doesn't match length in db, we reload
325 // this avoids carrying forward broken data from previous xml dumps
326 if( strlen( $text ) == $revLength ) {
327 $this->prefetchCount++;
328 return $text;
329 }
330 }
331 }
332 return $this->doGetText( $id );
333 }
334
335 private function doGetText( $id ) {
336
337 $id = intval( $id );
338 $this->failures = 0;
339 $ex = new MWException( "Graceful storage failure" );
340 while (true) {
341 if ( $this->spawn ) {
342 if ($this->failures) {
343 // we don't know why it failed, could be the child process
344 // borked, could be db entry busted, could be db server out to lunch,
345 // so cover all bases
346 $this->closeSpawn();
347 $this->openSpawn();
348 }
349 $text = $this->getTextSpawned( $id );
350 } else {
351 $text = $this->getTextDbSafe( $id );
352 }
353 if ( $text === false ) {
354 $this->failures++;
355 if ( $this->failures > $this->maxFailures) {
356 $this->progress( "Failed to retrieve revision text for text id ".
357 "$id after $this->maxFailures tries, giving up" );
358 // were there so many bad retrievals in a row we want to bail?
359 // at some point we have to declare the dump irretrievably broken
360 $this->failedTextRetrievals++;
361 if ($this->failedTextRetrievals > $this->maxConsecutiveFailedTextRetrievals) {
362 throw $ex;
363 }
364 else {
365 // would be nice to return something better to the caller someday,
366 // log what we know about the failure and about the revision
367 return("");
368 }
369 } else {
370 $this->progress( "Error $this->failures " .
371 "of allowed $this->maxFailures retrieving revision text for text id $id! " .
372 "Pausing $this->failureTimeout seconds before retry..." );
373 sleep( $this->failureTimeout );
374 }
375 } else {
376 $this->failedTextRetrievals= 0;
377 return( $text );
378 }
379 }
380
381 }
382
383 /**
384 * Fetch a text revision from the database, retrying in case of failure.
385 * This may survive some transitory errors by reconnecting, but
386 * may not survive a long-term server outage.
387 */
388 private function getTextDbSafe( $id ) {
389 while ( true ) {
390 try {
391 $text = $this->getTextDb( $id );
392 } catch ( DBQueryError $ex ) {
393 $text = false;
394 }
395 return $text;
396 }
397 }
398
399 /**
400 * May throw a database error if, say, the server dies during query.
401 */
402 private function getTextDb( $id ) {
403 global $wgContLang;
404 $row = $this->db->selectRow( 'text',
405 array( 'old_text', 'old_flags' ),
406 array( 'old_id' => $id ),
407 __METHOD__ );
408 $text = Revision::getRevisionText( $row );
409 if ( $text === false ) {
410 return false;
411 }
412 $stripped = str_replace( "\r", "", $text );
413 $normalized = $wgContLang->normalize( $stripped );
414 return $normalized;
415 }
416
417 private function getTextSpawned( $id ) {
418 wfSuppressWarnings();
419 if ( !$this->spawnProc ) {
420 // First time?
421 $this->openSpawn();
422 }
423 $text = $this->getTextSpawnedOnce( $id );
424 wfRestoreWarnings();
425 return $text;
426 }
427
428 function openSpawn() {
429 global $IP;
430
431 $cmd = implode( " ",
432 array_map( 'wfEscapeShellArg',
433 array(
434 $this->php,
435 "$IP/maintenance/fetchText.php",
436 '--wiki', wfWikiID() ) ) );
437 $spec = array(
438 0 => array( "pipe", "r" ),
439 1 => array( "pipe", "w" ),
440 2 => array( "file", "/dev/null", "a" ) );
441 $pipes = array();
442
443 $this->progress( "Spawning database subprocess: $cmd" );
444 $this->spawnProc = proc_open( $cmd, $spec, $pipes );
445 if ( !$this->spawnProc ) {
446 // shit
447 $this->progress( "Subprocess spawn failed." );
448 return false;
449 }
450 list(
451 $this->spawnWrite, // -> stdin
452 $this->spawnRead, // <- stdout
453 ) = $pipes;
454
455 return true;
456 }
457
458 private function closeSpawn() {
459 wfSuppressWarnings();
460 if ( $this->spawnRead )
461 fclose( $this->spawnRead );
462 $this->spawnRead = false;
463 if ( $this->spawnWrite )
464 fclose( $this->spawnWrite );
465 $this->spawnWrite = false;
466 if ( $this->spawnErr )
467 fclose( $this->spawnErr );
468 $this->spawnErr = false;
469 if ( $this->spawnProc )
470 pclose( $this->spawnProc );
471 $this->spawnProc = false;
472 wfRestoreWarnings();
473 }
474
475 private function getTextSpawnedOnce( $id ) {
476 global $wgContLang;
477
478 $ok = fwrite( $this->spawnWrite, "$id\n" );
479 // $this->progress( ">> $id" );
480 if ( !$ok ) return false;
481
482 $ok = fflush( $this->spawnWrite );
483 // $this->progress( ">> [flush]" );
484 if ( !$ok ) return false;
485
486 // check that the text id they are sending is the one we asked for
487 // this avoids out of sync revision text errors we have encountered in the past
488 $newId = fgets( $this->spawnRead );
489 if ( $newId === false ) {
490 return false;
491 }
492 if ( $id != intval( $newId ) ) {
493 return false;
494 }
495
496 $len = fgets( $this->spawnRead );
497 // $this->progress( "<< " . trim( $len ) );
498 if ( $len === false ) return false;
499
500 $nbytes = intval( $len );
501 // actual error, not zero-length text
502 if ($nbytes < 0 ) return false;
503
504 $text = "";
505
506 // Subprocess may not send everything at once, we have to loop.
507 while ( $nbytes > strlen( $text ) ) {
508 $buffer = fread( $this->spawnRead, $nbytes - strlen( $text ) );
509 if ( $buffer === false ) break;
510 $text .= $buffer;
511 }
512
513 $gotbytes = strlen( $text );
514 if ( $gotbytes != $nbytes ) {
515 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
516 return false;
517 }
518
519 // Do normalization in the dump thread...
520 $stripped = str_replace( "\r", "", $text );
521 $normalized = $wgContLang->normalize( $stripped );
522 return $normalized;
523 }
524
525 function startElement( $parser, $name, $attribs ) {
526 $this->checkpointJustWritten = false;
527
528 $this->clearOpenElement( null );
529 $this->lastName = $name;
530
531 if ( $name == 'revision' ) {
532 $this->state = $name;
533 $this->egress->writeOpenPage( null, $this->buffer );
534 $this->buffer = "";
535 } elseif ( $name == 'page' ) {
536 $this->state = $name;
537 if ( $this->atStart ) {
538 $this->egress->writeOpenStream( $this->buffer );
539 $this->buffer = "";
540 $this->atStart = false;
541 }
542 }
543
544 if ( $name == "text" && isset( $attribs['id'] ) ) {
545 $text = $this->getText( $attribs['id'] );
546 $this->openElement = array( $name, array( 'xml:space' => 'preserve' ) );
547 if ( strlen( $text ) > 0 ) {
548 $this->characterData( $parser, $text );
549 }
550 } else {
551 $this->openElement = array( $name, $attribs );
552 }
553 }
554
555 function endElement( $parser, $name ) {
556 $this->checkpointJustWritten = false;
557
558 if ( $this->openElement ) {
559 $this->clearOpenElement( "" );
560 } else {
561 $this->buffer .= "</$name>";
562 }
563
564 if ( $name == 'revision' ) {
565 $this->egress->writeRevision( null, $this->buffer );
566 $this->buffer = "";
567 $this->thisRev = "";
568 } elseif ( $name == 'page' ) {
569 if (! $this->firstPageWritten) {
570 $this->firstPageWritten = trim($this->thisPage);
571 }
572 $this->lastPageWritten = trim($this->thisPage);
573 if ($this->timeExceeded) {
574 $this->egress->writeClosePage( $this->buffer );
575 # nasty hack, we can't just write the chardata after the
576 # page tag, it will include leading blanks from the next line
577 $this->egress->sink->write("\n");
578
579 $this->buffer = $this->xmlwriterobj->closeStream();
580 $this->egress->writeCloseStream( $this->buffer );
581
582 $this->buffer = "";
583 $this->thisPage = "";
584 /* this could be more than one file if we had more than one output arg */
585 $checkpointFilenames = array();
586 $filenameList = $this->egress->getFilename();
587
588 if (! is_array($filenameList)) {
589 $filenameList = array( $filenameList );
590 }
591 $newFilenames = array();
592 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
593 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
594 for ($i =0; $i < count($filenameList); $i++) {
595 $checkpointNameFilledIn = sprintf($this->checkpointFiles[$i], $firstPageID, $lastPageID);
596 $fileinfo = pathinfo($filenameList[$i]);
597 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
598 }
599 $this->egress->closeRenameAndReopen( $newFilenames );
600 $this->buffer = $this->xmlwriterobj->openStream();
601 $this->timeExceeded = false;
602 $this->timeOfCheckpoint = $this->lastTime;
603 $this->firstPageWritten = false;
604 $this->checkpointJustWritten = true;
605 }
606 else {
607 $this->egress->writeClosePage( $this->buffer );
608 $this->buffer = "";
609 $this->thisPage = "";
610 }
611
612 } elseif ( $name == 'mediawiki' ) {
613 $this->egress->writeCloseStream( $this->buffer );
614 $this->buffer = "";
615 }
616 }
617
618 function characterData( $parser, $data ) {
619 $this->clearOpenElement( null );
620 if ( $this->lastName == "id" ) {
621 if ( $this->state == "revision" ) {
622 $this->thisRev .= $data;
623 } elseif ( $this->state == "page" ) {
624 $this->thisPage .= $data;
625 }
626 }
627 # have to skip the newline left over from closepagetag line of
628 # end of checkpoint files. nasty hack!!
629 if ($this->checkpointJustWritten) {
630 if ($data[0] == "\n") {
631 $data = substr($data,1);
632 }
633 $this->checkpointJustWritten = false;
634 }
635 $this->buffer .= htmlspecialchars( $data );
636 }
637
638 function clearOpenElement( $style ) {
639 if ( $this->openElement ) {
640 $this->buffer .= Xml::element( $this->openElement[0], $this->openElement[1], $style );
641 $this->openElement = false;
642 }
643 }
644 }
645
646
647 $dumper = new TextPassDumper( $argv );
648
649 if ( !isset( $options['help'] ) ) {
650 $dumper->dump( true );
651 } else {
652 $dumper->progress( <<<ENDS
653 This script postprocesses XML dumps from dumpBackup.php to add
654 page text which was stubbed out (using --stub).
655
656 XML input is accepted on stdin.
657 XML output is sent to stdout; progress reports are sent to stderr.
658
659 Usage: php dumpTextPass.php [<options>]
660 Options:
661 --stub=<type>:<file> To load a compressed stub dump instead of stdin
662 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
663 pressure on the database.
664 (Requires the XMLReader extension)
665 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
666 out complete page, closing xml file properly, and opening new one
667 with header). This option requires the checkpointfile option.
668 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
669 substituting first pageid written for the first %s (required) and the
670 last pageid written for the second %s if it exists.
671 --quiet Don't dump status reports to stderr.
672 --report=n Report position and speed after every n pages processed.
673 (Default: 100)
674 --server=h Force reading from MySQL server h
675 --current Base ETA on number of pages in database instead of all revisions
676 --spawn Spawn a subprocess for loading text records
677 --help Display this help message
678 ENDS
679 );
680 }
681
682