Move the check for if a Maintenance script supports --batch-size away from addDefault...
[lhc/web/wiklou.git] / maintenance / dumpTextPass.php
1 <?php
2 /**
3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
4 *
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
17 *
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
22 *
23 * @file
24 * @ingroup Maintenance
25 */
26
27 $originalDir = getcwd();
28
29 require_once( dirname( __FILE__ ) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
31
32 /**
33 * @ingroup Maintenance
34 */
35 class TextPassDumper extends BackupDumper {
36 var $prefetch = null;
37 var $input = "php://stdin";
38 var $history = WikiExporter::FULL;
39 var $fetchCount = 0;
40 var $prefetchCount = 0;
41 var $lastTime = 0;
42 var $pageCountLast = 0;
43 var $revCountLast = 0;
44 var $prefetchCountLast = 0;
45 var $fetchCountLast = 0;
46
47 var $failures = 0;
48 var $maxFailures = 5;
49 var $failedTextRetrievals = 0;
50 var $maxConsecutiveFailedTextRetrievals = 200;
51 var $failureTimeout = 5; // Seconds to sleep after db failure
52
53 var $php = "php";
54 var $spawn = false;
55 var $spawnProc = false;
56 var $spawnWrite = false;
57 var $spawnRead = false;
58 var $spawnErr = false;
59
60 var $ID = 0;
61
62 var $xmlwriterobj = false;
63
64 # when we spend more than maxTimeAllowed seconds on this run, we continue
65 # processing until we write out the next complete page, then save output file(s),
66 # rename it/them and open new one(s)
67 var $maxTimeAllowed = 0; // 0 = no limit
68 var $timeExceeded = false;
69 var $firstPageWritten = false;
70 var $lastPageWritten = false;
71 var $checkpointJustWritten = false;
72 var $checkpointFiles = array();
73
74 function initProgress( $history ) {
75 parent::initProgress();
76 $this->ID = getmypid();
77 $this->lastTime = $this->startTime;
78 $this->timeOfCheckpoint = $this->startTime;
79 }
80
81 function dump( $history, $text = WikiExporter::TEXT ) {
82 # This shouldn't happen if on console... ;)
83 header( 'Content-type: text/html; charset=UTF-8' );
84
85 # Notice messages will foul up your XML output even if they're
86 # relatively harmless.
87 if ( ini_get( 'display_errors' ) )
88 ini_set( 'display_errors', 'stderr' );
89
90 $this->initProgress( $this->history );
91
92 $this->db = $this->backupDb();
93
94 $this->egress = new ExportProgressFilter( $this->sink, $this );
95
96 # it would be nice to do it in the constructor, oh well. need egress set
97 $this->finalOptionCheck();
98
99 # we only want this so we know how to close a stream :-P
100 $this->xmlwriterobj = new XmlDumpWriter();
101
102 $input = fopen( $this->input, "rt" );
103 $result = $this->readDump( $input );
104
105 if ( WikiError::isError( $result ) ) {
106 wfDie( $result->getMessage() );
107 }
108
109 if ( $this->spawnProc ) {
110 $this->closeSpawn();
111 }
112
113 $this->report( true );
114 }
115
116 function processOption( $opt, $val, $param ) {
117 global $IP;
118 $url = $this->processFileOpt( $val, $param );
119
120 switch( $opt ) {
121 case 'prefetch':
122 require_once "$IP/maintenance/backupPrefetch.inc";
123 $this->prefetch = new BaseDump( $url );
124 break;
125 case 'stub':
126 $this->input = $url;
127 break;
128 case 'maxtime':
129 $this->maxTimeAllowed = intval($val)*60;
130 break;
131 case 'checkpointfile':
132 $this->checkpointFiles[] = $val;
133 break;
134 case 'current':
135 $this->history = WikiExporter::CURRENT;
136 break;
137 case 'full':
138 $this->history = WikiExporter::FULL;
139 break;
140 case 'spawn':
141 $this->spawn = true;
142 if ( $val ) {
143 $this->php = $val;
144 }
145 break;
146 }
147 }
148
149 function processFileOpt( $val, $param ) {
150 $fileURIs = explode(';',$param);
151 foreach ( $fileURIs as $URI ) {
152 switch( $val ) {
153 case "file":
154 $newURI = $URI;
155 break;
156 case "gzip":
157 $newURI = "compress.zlib://$URI";
158 break;
159 case "bzip2":
160 $newURI = "compress.bzip2://$URI";
161 break;
162 case "7zip":
163 $newURI = "mediawiki.compress.7z://$URI";
164 break;
165 default:
166 $newURI = $URI;
167 }
168 $newFileURIs[] = $newURI;
169 }
170 $val = implode( ';', $newFileURIs );
171 return $val;
172 }
173
174 /**
175 * Overridden to include prefetch ratio if enabled.
176 */
177 function showReport() {
178 if ( !$this->prefetch ) {
179 return parent::showReport();
180 }
181
182 if ( $this->reporting ) {
183 $now = wfTimestamp( TS_DB );
184 $nowts = wfTime();
185 $deltaAll = wfTime() - $this->startTime;
186 $deltaPart = wfTime() - $this->lastTime;
187 $this->pageCountPart = $this->pageCount - $this->pageCountLast;
188 $this->revCountPart = $this->revCount - $this->revCountLast;
189
190 if ( $deltaAll ) {
191 $portion = $this->revCount / $this->maxCount;
192 $eta = $this->startTime + $deltaAll / $portion;
193 $etats = wfTimestamp( TS_DB, intval( $eta ) );
194 if ( $this->fetchCount ) {
195 $fetchRate = 100.0 * $this->prefetchCount / $this->fetchCount;
196 }
197 else {
198 $fetchRate = '-';
199 }
200 $pageRate = $this->pageCount / $deltaAll;
201 $revRate = $this->revCount / $deltaAll;
202 } else {
203 $pageRate = '-';
204 $revRate = '-';
205 $etats = '-';
206 $fetchRate = '-';
207 }
208 if ( $deltaPart ) {
209 if ( $this->fetchCountLast ) {
210 $fetchRatePart = 100.0 * $this->prefetchCountLast / $this->fetchCountLast;
211 }
212 else {
213 $fetchRatePart = '-';
214 }
215 $pageRatePart = $this->pageCountPart / $deltaPart;
216 $revRatePart = $this->revCountPart / $deltaPart;
217
218 } else {
219 $fetchRatePart = '-';
220 $pageRatePart = '-';
221 $revRatePart = '-';
222 }
223 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",
224 $now, wfWikiID(), $this->ID, $this->pageCount, $pageRate, $pageRatePart, $this->revCount, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount ) );
225 $this->lastTime = $nowts;
226 $this->revCountLast = $this->revCount;
227 $this->prefetchCountLast = $this->prefetchCount;
228 $this->fetchCountLast = $this->fetchCount;
229 }
230 }
231
232 function setTimeExceeded() {
233 $this->timeExceeded = True;
234 }
235
236 function checkIfTimeExceeded() {
237 $m1 = $this->maxTimeAllowed;
238 $m2 = $this->lastTime;
239 $m3 = $this->timeOfCheckpoint;
240 $m4 = $this->lastTime - $this->timeOfCheckpoint;
241 if ( $this->maxTimeAllowed && ( $this->lastTime - $this->timeOfCheckpoint > $this->maxTimeAllowed ) ) {
242 return True;
243 }
244 return False;
245 }
246
247 function finalOptionCheck() {
248 if (($this->checkpointFiles && ! $this->maxTimeAllowed) ||
249 ($this->maxTimeAllowed && !$this->checkpointFiles)) {
250 wfDie("Options checkpointfile and maxtime must be specified together.\n");
251 }
252 foreach ($this->checkpointFiles as $checkpointFile) {
253 $count = substr_count ($checkpointFile,"%s");
254 if (substr_count ($checkpointFile,"%s") != 2) {
255 wfDie("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
256 }
257 }
258
259 $filenameList = $this->egress->getFilename();
260 if (! is_array($filenameList)) {
261 $filenameList = array( $filenameList );
262 }
263 if (count($filenameList) != count($this->checkpointFiles)) {
264 wfDie("One checkpointfile must be specified for each output option, if maxtime is used.\n");
265 }
266 }
267
268 function readDump( $input ) {
269 $this->buffer = "";
270 $this->openElement = false;
271 $this->atStart = true;
272 $this->state = "";
273 $this->lastName = "";
274 $this->thisPage = 0;
275 $this->thisRev = 0;
276
277 $parser = xml_parser_create( "UTF-8" );
278 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING, false );
279
280 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
281 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
282
283 $offset = 0; // for context extraction on error reporting
284 $bufferSize = 512 * 1024;
285 do {
286 if ($this->checkIfTimeExceeded()) {
287 $this->setTimeExceeded();
288 }
289 $chunk = fread( $input, $bufferSize );
290 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
291 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
292 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
293 }
294 $offset += strlen( $chunk );
295 } while ( $chunk !== false && !feof( $input ) );
296 if ($this->maxTimeAllowed) {
297 $filenameList = $this->egress->getFilename();
298 # we wrote some stuff after last checkpoint that needs renamed */
299 if (! is_array($filenameList)) {
300 $filenameList = array( $filenameList );
301 }
302 if (file_exists($filenameList[0])) {
303 $newFilenames = array();
304 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
305 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
306 for ($i =0; $i < count($filenameList); $i++) {
307 $checkpointNameFilledIn = sprintf($this->checkpointFiles[$i], $firstPageID, $lastPageID);
308 $fileinfo = pathinfo($filenameList[$i]);
309 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
310 }
311 $this->egress->rename( $newFilenames );
312 }
313 }
314 xml_parser_free( $parser );
315
316 return true;
317 }
318
319 function getText( $id ) {
320 $this->fetchCount++;
321 if ( isset( $this->prefetch ) ) {
322 $text = $this->prefetch->prefetch( $this->thisPage, $this->thisRev );
323 if ( $text !== null ) { // Entry missing from prefetch dump
324 $dbr = wfGetDB( DB_SLAVE );
325 $revID = intval( $this->thisRev );
326 $revLength = $dbr->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
327 // if length of rev text in file doesn't match length in db, we reload
328 // this avoids carrying forward broken data from previous xml dumps
329 if( strlen( $text ) == $revLength ) {
330 $this->prefetchCount++;
331 return $text;
332 }
333 }
334 }
335 return $this->doGetText( $id );
336 }
337
338 private function doGetText( $id ) {
339
340 $id = intval( $id );
341 $this->failures = 0;
342 $ex = new MWException( "Graceful storage failure" );
343 while (true) {
344 if ( $this->spawn ) {
345 if ($this->failures) {
346 // we don't know why it failed, could be the child process
347 // borked, could be db entry busted, could be db server out to lunch,
348 // so cover all bases
349 $this->closeSpawn();
350 $this->openSpawn();
351 }
352 $text = $this->getTextSpawned( $id );
353 } else {
354 $text = $this->getTextDbSafe( $id );
355 }
356 if ( $text === false ) {
357 $this->failures++;
358 if ( $this->failures > $this->maxFailures) {
359 $this->progress( "Failed to retrieve revision text for text id ".
360 "$id after $this->maxFailures tries, giving up" );
361 // were there so many bad retrievals in a row we want to bail?
362 // at some point we have to declare the dump irretrievably broken
363 $this->failedTextRetrievals++;
364 if ($this->failedTextRetrievals > $this->maxConsecutiveFailedTextRetrievals) {
365 throw $ex;
366 }
367 else {
368 // would be nice to return something better to the caller someday,
369 // log what we know about the failure and about the revision
370 return("");
371 }
372 } else {
373 $this->progress( "Error $this->failures " .
374 "of allowed $this->maxFailures retrieving revision text for text id $id! " .
375 "Pausing $this->failureTimeout seconds before retry..." );
376 sleep( $this->failureTimeout );
377 }
378 } else {
379 $this->failedTextRetrievals= 0;
380 return( $text );
381 }
382 }
383
384 }
385
386 /**
387 * Fetch a text revision from the database, retrying in case of failure.
388 * This may survive some transitory errors by reconnecting, but
389 * may not survive a long-term server outage.
390 */
391 private function getTextDbSafe( $id ) {
392 while ( true ) {
393 try {
394 $text = $this->getTextDb( $id );
395 } catch ( DBQueryError $ex ) {
396 $text = false;
397 }
398 return $text;
399 }
400 }
401
402 /**
403 * May throw a database error if, say, the server dies during query.
404 */
405 private function getTextDb( $id ) {
406 global $wgContLang;
407 $row = $this->db->selectRow( 'text',
408 array( 'old_text', 'old_flags' ),
409 array( 'old_id' => $id ),
410 __METHOD__ );
411 $text = Revision::getRevisionText( $row );
412 if ( $text === false ) {
413 return false;
414 }
415 $stripped = str_replace( "\r", "", $text );
416 $normalized = $wgContLang->normalize( $stripped );
417 return $normalized;
418 }
419
420 private function getTextSpawned( $id ) {
421 wfSuppressWarnings();
422 if ( !$this->spawnProc ) {
423 // First time?
424 $this->openSpawn();
425 }
426 $text = $this->getTextSpawnedOnce( $id );
427 wfRestoreWarnings();
428 return $text;
429 }
430
431 function openSpawn() {
432 global $IP;
433
434 $cmd = implode( " ",
435 array_map( 'wfEscapeShellArg',
436 array(
437 $this->php,
438 "$IP/maintenance/fetchText.php",
439 '--wiki', wfWikiID() ) ) );
440 $spec = array(
441 0 => array( "pipe", "r" ),
442 1 => array( "pipe", "w" ),
443 2 => array( "file", "/dev/null", "a" ) );
444 $pipes = array();
445
446 $this->progress( "Spawning database subprocess: $cmd" );
447 $this->spawnProc = proc_open( $cmd, $spec, $pipes );
448 if ( !$this->spawnProc ) {
449 // shit
450 $this->progress( "Subprocess spawn failed." );
451 return false;
452 }
453 list(
454 $this->spawnWrite, // -> stdin
455 $this->spawnRead, // <- stdout
456 ) = $pipes;
457
458 return true;
459 }
460
461 private function closeSpawn() {
462 wfSuppressWarnings();
463 if ( $this->spawnRead )
464 fclose( $this->spawnRead );
465 $this->spawnRead = false;
466 if ( $this->spawnWrite )
467 fclose( $this->spawnWrite );
468 $this->spawnWrite = false;
469 if ( $this->spawnErr )
470 fclose( $this->spawnErr );
471 $this->spawnErr = false;
472 if ( $this->spawnProc )
473 pclose( $this->spawnProc );
474 $this->spawnProc = false;
475 wfRestoreWarnings();
476 }
477
478 private function getTextSpawnedOnce( $id ) {
479 global $wgContLang;
480
481 $ok = fwrite( $this->spawnWrite, "$id\n" );
482 // $this->progress( ">> $id" );
483 if ( !$ok ) return false;
484
485 $ok = fflush( $this->spawnWrite );
486 // $this->progress( ">> [flush]" );
487 if ( !$ok ) return false;
488
489 // check that the text id they are sending is the one we asked for
490 // this avoids out of sync revision text errors we have encountered in the past
491 $newId = fgets( $this->spawnRead );
492 if ( $newId === false ) {
493 return false;
494 }
495 if ( $id != intval( $newId ) ) {
496 return false;
497 }
498
499 $len = fgets( $this->spawnRead );
500 // $this->progress( "<< " . trim( $len ) );
501 if ( $len === false ) return false;
502
503 $nbytes = intval( $len );
504 // actual error, not zero-length text
505 if ($nbytes < 0 ) return false;
506
507 $text = "";
508
509 // Subprocess may not send everything at once, we have to loop.
510 while ( $nbytes > strlen( $text ) ) {
511 $buffer = fread( $this->spawnRead, $nbytes - strlen( $text ) );
512 if ( $buffer === false ) break;
513 $text .= $buffer;
514 }
515
516 $gotbytes = strlen( $text );
517 if ( $gotbytes != $nbytes ) {
518 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
519 return false;
520 }
521
522 // Do normalization in the dump thread...
523 $stripped = str_replace( "\r", "", $text );
524 $normalized = $wgContLang->normalize( $stripped );
525 return $normalized;
526 }
527
528 function startElement( $parser, $name, $attribs ) {
529 $this->checkpointJustWritten = false;
530
531 $this->clearOpenElement( null );
532 $this->lastName = $name;
533
534 if ( $name == 'revision' ) {
535 $this->state = $name;
536 $this->egress->writeOpenPage( null, $this->buffer );
537 $this->buffer = "";
538 } elseif ( $name == 'page' ) {
539 $this->state = $name;
540 if ( $this->atStart ) {
541 $this->egress->writeOpenStream( $this->buffer );
542 $this->buffer = "";
543 $this->atStart = false;
544 }
545 }
546
547 if ( $name == "text" && isset( $attribs['id'] ) ) {
548 $text = $this->getText( $attribs['id'] );
549 $this->openElement = array( $name, array( 'xml:space' => 'preserve' ) );
550 if ( strlen( $text ) > 0 ) {
551 $this->characterData( $parser, $text );
552 }
553 } else {
554 $this->openElement = array( $name, $attribs );
555 }
556 }
557
558 function endElement( $parser, $name ) {
559 $this->checkpointJustWritten = false;
560
561 if ( $this->openElement ) {
562 $this->clearOpenElement( "" );
563 } else {
564 $this->buffer .= "</$name>";
565 }
566
567 if ( $name == 'revision' ) {
568 $this->egress->writeRevision( null, $this->buffer );
569 $this->buffer = "";
570 $this->thisRev = "";
571 } elseif ( $name == 'page' ) {
572 if (! $this->firstPageWritten) {
573 $this->firstPageWritten = trim($this->thisPage);
574 }
575 $this->lastPageWritten = trim($this->thisPage);
576 if ($this->timeExceeded) {
577 $this->egress->writeClosePage( $this->buffer );
578 # nasty hack, we can't just write the chardata after the
579 # page tag, it will include leading blanks from the next line
580 $this->egress->sink->write("\n");
581
582 $this->buffer = $this->xmlwriterobj->closeStream();
583 $this->egress->writeCloseStream( $this->buffer );
584
585 $this->buffer = "";
586 $this->thisPage = "";
587 /* this could be more than one file if we had more than one output arg */
588 $checkpointFilenames = array();
589 $filenameList = $this->egress->getFilename();
590
591 if (! is_array($filenameList)) {
592 $filenameList = array( $filenameList );
593 }
594 $newFilenames = array();
595 $firstPageID = str_pad($this->firstPageWritten,9,"0",STR_PAD_LEFT);
596 $lastPageID = str_pad($this->lastPageWritten,9,"0",STR_PAD_LEFT);
597 for ($i =0; $i < count($filenameList); $i++) {
598 $checkpointNameFilledIn = sprintf($this->checkpointFiles[$i], $firstPageID, $lastPageID);
599 $fileinfo = pathinfo($filenameList[$i]);
600 $newFilenames[] = $fileinfo{'dirname'} . '/' . $checkpointNameFilledIn;
601 }
602 $this->egress->closeRenameAndReopen( $newFilenames );
603 $this->buffer = $this->xmlwriterobj->openStream();
604 $this->timeExceeded = false;
605 $this->timeOfCheckpoint = $this->lastTime;
606 $this->firstPageWritten = false;
607 $this->checkpointJustWritten = true;
608 }
609 else {
610 $this->egress->writeClosePage( $this->buffer );
611 $this->buffer = "";
612 $this->thisPage = "";
613 }
614
615 } elseif ( $name == 'mediawiki' ) {
616 $this->egress->writeCloseStream( $this->buffer );
617 $this->buffer = "";
618 }
619 }
620
621 function characterData( $parser, $data ) {
622 $this->clearOpenElement( null );
623 if ( $this->lastName == "id" ) {
624 if ( $this->state == "revision" ) {
625 $this->thisRev .= $data;
626 } elseif ( $this->state == "page" ) {
627 $this->thisPage .= $data;
628 }
629 }
630 # have to skip the newline left over from closepagetag line of
631 # end of checkpoint files. nasty hack!!
632 if ($this->checkpointJustWritten) {
633 if ($data[0] == "\n") {
634 $data = substr($data,1);
635 }
636 $this->checkpointJustWritten = false;
637 }
638 $this->buffer .= htmlspecialchars( $data );
639 }
640
641 function clearOpenElement( $style ) {
642 if ( $this->openElement ) {
643 $this->buffer .= Xml::element( $this->openElement[0], $this->openElement[1], $style );
644 $this->openElement = false;
645 }
646 }
647 }
648
649
650 $dumper = new TextPassDumper( $argv );
651
652 if ( !isset( $options['help'] ) ) {
653 $dumper->dump( true );
654 } else {
655 $dumper->progress( <<<ENDS
656 This script postprocesses XML dumps from dumpBackup.php to add
657 page text which was stubbed out (using --stub).
658
659 XML input is accepted on stdin.
660 XML output is sent to stdout; progress reports are sent to stderr.
661
662 Usage: php dumpTextPass.php [<options>]
663 Options:
664 --stub=<type>:<file> To load a compressed stub dump instead of stdin
665 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
666 pressure on the database.
667 (Requires the XMLReader extension)
668 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
669 out complete page, closing xml file properly, and opening new one
670 with header). This option requires the checkpointfile option.
671 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
672 substituting first pageid written for the first %s (required) and the
673 last pageid written for the second %s if it exists.
674 --quiet Don't dump status reports to stderr.
675 --report=n Report position and speed after every n pages processed.
676 (Default: 100)
677 --server=h Force reading from MySQL server h
678 --current Base ETA on number of pages in database instead of all revisions
679 --spawn Spawn a subprocess for loading text records
680 --help Display this help message
681 ENDS
682 );
683 }
684
685