3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
24 * @ingroup Maintenance
27 $originalDir = getcwd();
29 require_once( dirname( __FILE__
) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
33 * @ingroup Maintenance
35 class TextPassDumper
extends BackupDumper
{
37 var $input = "php://stdin";
38 var $history = WikiExporter
::FULL
;
40 var $prefetchCount = 0;
41 var $prefetchCountLast = 0;
42 var $fetchCountLast = 0;
45 var $maxConsecutiveFailedTextRetrievals = 200;
46 var $failureTimeout = 5; // Seconds to sleep after db failure
50 var $spawnProc = false;
51 var $spawnWrite = false;
52 var $spawnRead = false;
53 var $spawnErr = false;
55 var $xmlwriterobj = false;
57 // when we spend more than maxTimeAllowed seconds on this run, we continue
58 // processing until we write out the next complete page, then save output file(s),
59 // rename it/them and open new one(s)
60 var $maxTimeAllowed = 0; // 0 = no limit
61 var $timeExceeded = false;
62 var $firstPageWritten = false;
63 var $lastPageWritten = false;
64 var $checkpointJustWritten = false;
65 var $checkpointFiles = array();
74 * Drop the database connection $this->db and try to get a new one.
76 * This function tries to get a /different/ connection if this is
77 * possible. Hence, (if this is possible) it switches to a different
78 * failover upon each call.
80 * This function resets $this->lb and closes all connections on it.
85 // Cleaning up old connections
86 if ( isset( $this->lb
) ) {
87 $this->lb
->closeAll();
90 assert( '! isset( $this->db ) || ! $this->db->isOpen() /* DB is either unset, or been closed via LB */' );
94 // Trying to set up new connection.
95 // We do /not/ retry upon failure, but delegate to encapsulating logic, to avoid
96 // individually retrying at different layers of code.
98 // 1. The LoadBalancer.
100 $this->lb
= wfGetLBFactory()->newMainLB();
101 } catch (Exception
$e) {
102 throw new MWException( __METHOD__
. " rotating DB failed to obtain new load balancer (" . $e->getMessage() . ")" );
106 // 2. The Connection, through the load balancer.
108 $this->db
= $this->lb
->getConnection( DB_SLAVE
, 'backup' );
109 } catch (Exception
$e) {
110 throw new MWException( __METHOD__
. " rotating DB failed to obtain new database (" . $e->getMessage() . ")" );
113 assert( 'isset( $this->lb ) && isset( $this->db ) && $this->db->isOpen() /* rotating the DB worked */' );
117 function initProgress( $history ) {
118 parent
::initProgress();
119 $this->timeOfCheckpoint
= $this->startTime
;
122 function dump( $history, $text = WikiExporter
::TEXT
) {
123 // This shouldn't happen if on console... ;)
124 header( 'Content-type: text/html; charset=UTF-8' );
126 // Notice messages will foul up your XML output even if they're
127 // relatively harmless.
128 if ( ini_get( 'display_errors' ) )
129 ini_set( 'display_errors', 'stderr' );
131 $this->initProgress( $this->history
);
133 // We are trying to get an initial database connection to avoid that the
134 // first try of this request's first call to getText fails. However, if
135 // obtaining a good DB connection fails it's not a serious issue, as
136 // getText does retry upon failure and can start without having a working
140 } catch (Exception
$e) {
141 // We do not even count this as failure. Just let eventual
143 $this->progress( "Getting initial DB connection failed (" .
144 $e->getMessage() . ")" );
147 $this->egress
= new ExportProgressFilter( $this->sink
, $this );
149 // it would be nice to do it in the constructor, oh well. need egress set
150 $this->finalOptionCheck();
152 // we only want this so we know how to close a stream :-P
153 $this->xmlwriterobj
= new XmlDumpWriter();
155 $input = fopen( $this->input
, "rt" );
156 $result = $this->readDump( $input );
158 if ( WikiError
::isError( $result ) ) {
159 throw new MWException( $result->getMessage() );
162 if ( $this->spawnProc
) {
166 $this->report( true );
169 function processOption( $opt, $val, $param ) {
171 $url = $this->processFileOpt( $val, $param );
175 require_once "$IP/maintenance/backupPrefetch.inc";
176 $this->prefetch
= new BaseDump( $url );
182 $this->maxTimeAllowed
= intval($val)*60;
184 case 'checkpointfile':
185 $this->checkpointFiles
[] = $val;
188 $this->history
= WikiExporter
::CURRENT
;
191 $this->history
= WikiExporter
::FULL
;
202 function processFileOpt( $val, $param ) {
203 $fileURIs = explode(';',$param);
204 foreach ( $fileURIs as $URI ) {
210 $newURI = "compress.zlib://$URI";
213 $newURI = "compress.bzip2://$URI";
216 $newURI = "mediawiki.compress.7z://$URI";
221 $newFileURIs[] = $newURI;
223 $val = implode( ';', $newFileURIs );
228 * Overridden to include prefetch ratio if enabled.
230 function showReport() {
231 if ( !$this->prefetch
) {
232 parent
::showReport();
236 if ( $this->reporting
) {
237 $now = wfTimestamp( TS_DB
);
239 $deltaAll = wfTime() - $this->startTime
;
240 $deltaPart = wfTime() - $this->lastTime
;
241 $this->pageCountPart
= $this->pageCount
- $this->pageCountLast
;
242 $this->revCountPart
= $this->revCount
- $this->revCountLast
;
245 $portion = $this->revCount
/ $this->maxCount
;
246 $eta = $this->startTime +
$deltaAll / $portion;
247 $etats = wfTimestamp( TS_DB
, intval( $eta ) );
248 if ( $this->fetchCount
) {
249 $fetchRate = 100.0 * $this->prefetchCount
/ $this->fetchCount
;
253 $pageRate = $this->pageCount
/ $deltaAll;
254 $revRate = $this->revCount
/ $deltaAll;
262 if ( $this->fetchCountLast
) {
263 $fetchRatePart = 100.0 * $this->prefetchCountLast
/ $this->fetchCountLast
;
265 $fetchRatePart = '-';
267 $pageRatePart = $this->pageCountPart
/ $deltaPart;
268 $revRatePart = $this->revCountPart
/ $deltaPart;
271 $fetchRatePart = '-';
275 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",
276 $now, wfWikiID(), $this->ID
, $this->pageCount
, $pageRate, $pageRatePart, $this->revCount
, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount
) );
277 $this->lastTime
= $nowts;
278 $this->revCountLast
= $this->revCount
;
279 $this->prefetchCountLast
= $this->prefetchCount
;
280 $this->fetchCountLast
= $this->fetchCount
;
284 function setTimeExceeded() {
285 $this->timeExceeded
= True;
288 function checkIfTimeExceeded() {
289 if ( $this->maxTimeAllowed
&& ( $this->lastTime
- $this->timeOfCheckpoint
> $this->maxTimeAllowed
) ) {
295 function finalOptionCheck() {
296 if ( ( $this->checkpointFiles
&& ! $this->maxTimeAllowed
) ||
297 ( $this->maxTimeAllowed
&& !$this->checkpointFiles
) ) {
298 throw new MWException("Options checkpointfile and maxtime must be specified together.\n");
300 foreach ($this->checkpointFiles
as $checkpointFile) {
301 $count = substr_count ( $checkpointFile,"%s" );
303 throw new MWException("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
307 if ( $this->checkpointFiles
) {
308 $filenameList = (array)$this->egress
->getFilenames();
309 if ( count( $filenameList ) != count( $this->checkpointFiles
) ) {
310 throw new MWException("One checkpointfile must be specified for each output option, if maxtime is used.\n");
315 function readDump( $input ) {
317 $this->openElement
= false;
318 $this->atStart
= true;
320 $this->lastName
= "";
324 $parser = xml_parser_create( "UTF-8" );
325 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING
, false );
327 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
328 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
330 $offset = 0; // for context extraction on error reporting
331 $bufferSize = 512 * 1024;
333 if ($this->checkIfTimeExceeded()) {
334 $this->setTimeExceeded();
336 $chunk = fread( $input, $bufferSize );
337 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
338 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
339 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
341 $offset +
= strlen( $chunk );
342 } while ( $chunk !== false && !feof( $input ) );
343 if ($this->maxTimeAllowed
) {
344 $filenameList = (array)$this->egress
->getFilenames();
345 // we wrote some stuff after last checkpoint that needs renamed
346 if (file_exists($filenameList[0])) {
347 $newFilenames = array();
348 # we might have just written the header and footer and had no
349 # pages or revisions written... perhaps they were all deleted
350 # there's no pageID 0 so we use that. the caller is responsible
351 # for deciding what to do with a file containing only the
352 # siteinfo information and the mw tags.
353 if (! $this->firstPageWritten
) {
354 $firstPageID = str_pad(0,9,"0",STR_PAD_LEFT
);
355 $lastPageID = str_pad(0,9,"0",STR_PAD_LEFT
);
358 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
359 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
361 for ( $i = 0; $i < count( $filenameList ); $i++
) {
362 $checkpointNameFilledIn = sprintf( $this->checkpointFiles
[$i], $firstPageID, $lastPageID );
363 $fileinfo = pathinfo($filenameList[$i]);
364 $newFilenames[] = $fileinfo['dirname'] . '/' . $checkpointNameFilledIn;
366 $this->egress
->closeAndRename( $newFilenames );
369 xml_parser_free( $parser );
375 * Tries to get the revision text for a revision id.
377 * Upon errors, retries (Up to $this->maxFailures tries each call).
378 * If still no good revision get could be found even after this retrying, "" is returned.
379 * If no good revision text could be returned for
380 * $this->maxConsecutiveFailedTextRetrievals consecutive calls to getText, MWException
383 * @param $id string The revision id to get the text for
385 * @return string The revision text for $id, or ""
386 * @throws MWException
388 function getText( $id ) {
389 $prefetchNotTried = true; // Whether or not we already tried to get the text via prefetch.
390 $text = false; // The candidate for a good text. false if no proper value.
391 $failures = 0; // The number of times, this invocation of getText already failed.
393 static $consecutiveFailedTextRetrievals = 0; // The number of times getText failed without
394 // yielding a good text in between.
398 // To allow to simply return on success and do not have to worry about book keeping,
399 // we assume, this fetch works (possible after some retries). Nevertheless, we koop
400 // the old value, so we can restore it, if problems occur (See after the while loop).
401 $oldConsecutiveFailedTextRetrievals = $consecutiveFailedTextRetrievals;
402 $consecutiveFailedTextRetrievals = 0;
404 while ( $failures < $this->maxFailures
) {
406 // As soon as we found a good text for the $id, we will return immediately.
407 // Hence, if we make it past the try catch block, we know that we did not
411 // Step 1: Get some text (or reuse from previous iteratuon if checking
412 // for plausibility failed)
414 // Trying to get prefetch, if it has not been tried before
415 if ( $text === false && isset( $this->prefetch
) && $prefetchNotTried ) {
416 $prefetchNotTried = false;
417 $tryIsPrefetch = true;
418 $text = $this->prefetch
->prefetch( $this->thisPage
, $this->thisRev
);
419 if ( $text === null ) {
424 if ( $text === false ) {
425 // Fallback to asking the database
426 $tryIsPrefetch = false;
427 if ( $this->spawn
) {
428 $text = $this->getTextSpawned( $id );
430 $text = $this->getTextDb( $id );
434 if ( $text === false ) {
435 throw new MWException( "Generic error while obtaining text for id " . $id );
438 assert( '$text !== false' );
439 // We received a good candidate for the text of $id via some method
441 // Step 2: Checking for plausibility and return the text if it is
443 $revID = intval( $this->thisRev
);
444 if ( ! isset( $this->db
) ) {
445 throw new MWException( "No database available" );
447 $revLength = $this->db
->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
448 if( strlen( $text ) == $revLength ) {
449 if ( $tryIsPrefetch ) {
450 $this->prefetchCount++
;
455 assert( 'strlen( $text ) != $revLength /* Obtained text unplausible */' );
457 throw new MWException( "Received text is unplausible for id " . $id );
459 assert( 'false /* text is either returned or exception has been thrown */' );
461 } catch (Exception
$e) {
462 $msg = "getting/checking text " . $id . " failed (".$e->getMessage().")";
463 if ( $failures +
1 < $this->maxFailures
) {
464 $msg .= " (Will retry " . ( $this->maxFailures
- $failures - 1) . " more times)";
466 $this->progress( $msg );
469 // Something went wrong; we did not a text that was plausible :(
473 // After backing off for some time, we try to reboot the whole process as
474 // much as possible to not carry over failures from one part to the other
476 sleep( $this->failureTimeout
);
479 if ( $this->spawn
) {
483 } catch (Exception
$e) {
484 $this->progress( "Rebooting getText infrastructure failed (".$e->getMessage().")" .
485 " Trying to continue anyways" );
489 // Retirieving a good text for $id failed (at least) maxFailures times.
490 // We abort for this $id.
492 // Restoring the consecutive failures, and maybe aborting, if the dump
494 $consecutiveFailedTextRetrievals = $oldConsecutiveFailedTextRetrievals +
1;
495 if ( $consecutiveFailedTextRetrievals > $this->maxConsecutiveFailedTextRetrievals
) {
496 throw new MWException( "Graceful storage failure" );
504 * May throw a database error if, say, the server dies during query.
506 * @return bool|string
507 * @throws MWException
509 private function getTextDb( $id ) {
511 if ( ! isset( $this->db
) ) {
512 throw new MWException( __METHOD__
. "No database available" );
514 $row = $this->db
->selectRow( 'text',
515 array( 'old_text', 'old_flags' ),
516 array( 'old_id' => $id ),
518 $text = Revision
::getRevisionText( $row );
519 if ( $text === false ) {
522 $stripped = str_replace( "\r", "", $text );
523 $normalized = $wgContLang->normalize( $stripped );
527 private function getTextSpawned( $id ) {
528 wfSuppressWarnings();
529 if ( !$this->spawnProc
) {
533 $text = $this->getTextSpawnedOnce( $id );
538 function openSpawn() {
541 if ( file_exists( "$IP/../multiversion/MWScript.php" ) ) {
543 array_map( 'wfEscapeShellArg',
546 "$IP/../multiversion/MWScript.php",
548 '--wiki', wfWikiID() ) ) );
552 array_map( 'wfEscapeShellArg',
555 "$IP/maintenance/fetchText.php",
556 '--wiki', wfWikiID() ) ) );
559 0 => array( "pipe", "r" ),
560 1 => array( "pipe", "w" ),
561 2 => array( "file", "/dev/null", "a" ) );
564 $this->progress( "Spawning database subprocess: $cmd" );
565 $this->spawnProc
= proc_open( $cmd, $spec, $pipes );
566 if ( !$this->spawnProc
) {
568 $this->progress( "Subprocess spawn failed." );
572 $this->spawnWrite
, // -> stdin
573 $this->spawnRead
, // <- stdout
579 private function closeSpawn() {
580 wfSuppressWarnings();
581 if ( $this->spawnRead
)
582 fclose( $this->spawnRead
);
583 $this->spawnRead
= false;
584 if ( $this->spawnWrite
)
585 fclose( $this->spawnWrite
);
586 $this->spawnWrite
= false;
587 if ( $this->spawnErr
)
588 fclose( $this->spawnErr
);
589 $this->spawnErr
= false;
590 if ( $this->spawnProc
)
591 pclose( $this->spawnProc
);
592 $this->spawnProc
= false;
596 private function getTextSpawnedOnce( $id ) {
599 $ok = fwrite( $this->spawnWrite
, "$id\n" );
600 // $this->progress( ">> $id" );
601 if ( !$ok ) return false;
603 $ok = fflush( $this->spawnWrite
);
604 // $this->progress( ">> [flush]" );
605 if ( !$ok ) return false;
607 // check that the text id they are sending is the one we asked for
608 // this avoids out of sync revision text errors we have encountered in the past
609 $newId = fgets( $this->spawnRead
);
610 if ( $newId === false ) {
613 if ( $id != intval( $newId ) ) {
617 $len = fgets( $this->spawnRead
);
618 // $this->progress( "<< " . trim( $len ) );
619 if ( $len === false ) return false;
621 $nbytes = intval( $len );
622 // actual error, not zero-length text
623 if ($nbytes < 0 ) return false;
627 // Subprocess may not send everything at once, we have to loop.
628 while ( $nbytes > strlen( $text ) ) {
629 $buffer = fread( $this->spawnRead
, $nbytes - strlen( $text ) );
630 if ( $buffer === false ) break;
634 $gotbytes = strlen( $text );
635 if ( $gotbytes != $nbytes ) {
636 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
640 // Do normalization in the dump thread...
641 $stripped = str_replace( "\r", "", $text );
642 $normalized = $wgContLang->normalize( $stripped );
646 function startElement( $parser, $name, $attribs ) {
647 $this->checkpointJustWritten
= false;
649 $this->clearOpenElement( null );
650 $this->lastName
= $name;
652 if ( $name == 'revision' ) {
653 $this->state
= $name;
654 $this->egress
->writeOpenPage( null, $this->buffer
);
656 } elseif ( $name == 'page' ) {
657 $this->state
= $name;
658 if ( $this->atStart
) {
659 $this->egress
->writeOpenStream( $this->buffer
);
661 $this->atStart
= false;
665 if ( $name == "text" && isset( $attribs['id'] ) ) {
666 $text = $this->getText( $attribs['id'] );
667 $this->openElement
= array( $name, array( 'xml:space' => 'preserve' ) );
668 if ( strlen( $text ) > 0 ) {
669 $this->characterData( $parser, $text );
672 $this->openElement
= array( $name, $attribs );
676 function endElement( $parser, $name ) {
677 $this->checkpointJustWritten
= false;
679 if ( $this->openElement
) {
680 $this->clearOpenElement( "" );
682 $this->buffer
.= "</$name>";
685 if ( $name == 'revision' ) {
686 $this->egress
->writeRevision( null, $this->buffer
);
689 } elseif ( $name == 'page' ) {
690 if (! $this->firstPageWritten
) {
691 $this->firstPageWritten
= trim($this->thisPage
);
693 $this->lastPageWritten
= trim($this->thisPage
);
694 if ($this->timeExceeded
) {
695 $this->egress
->writeClosePage( $this->buffer
);
696 // nasty hack, we can't just write the chardata after the
697 // page tag, it will include leading blanks from the next line
698 $this->egress
->sink
->write("\n");
700 $this->buffer
= $this->xmlwriterobj
->closeStream();
701 $this->egress
->writeCloseStream( $this->buffer
);
704 $this->thisPage
= "";
705 // this could be more than one file if we had more than one output arg
707 $filenameList = (array)$this->egress
->getFilenames();
708 $newFilenames = array();
709 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
710 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
711 for ( $i = 0; $i < count( $filenameList ); $i++
) {
712 $checkpointNameFilledIn = sprintf( $this->checkpointFiles
[$i], $firstPageID, $lastPageID );
713 $fileinfo = pathinfo($filenameList[$i]);
714 $newFilenames[] = $fileinfo['dirname'] . '/' . $checkpointNameFilledIn;
716 $this->egress
->closeRenameAndReopen( $newFilenames );
717 $this->buffer
= $this->xmlwriterobj
->openStream();
718 $this->timeExceeded
= false;
719 $this->timeOfCheckpoint
= $this->lastTime
;
720 $this->firstPageWritten
= false;
721 $this->checkpointJustWritten
= true;
724 $this->egress
->writeClosePage( $this->buffer
);
726 $this->thisPage
= "";
729 } elseif ( $name == 'mediawiki' ) {
730 $this->egress
->writeCloseStream( $this->buffer
);
735 function characterData( $parser, $data ) {
736 $this->clearOpenElement( null );
737 if ( $this->lastName
== "id" ) {
738 if ( $this->state
== "revision" ) {
739 $this->thisRev
.= $data;
740 } elseif ( $this->state
== "page" ) {
741 $this->thisPage
.= $data;
744 // have to skip the newline left over from closepagetag line of
745 // end of checkpoint files. nasty hack!!
746 if ($this->checkpointJustWritten
) {
747 if ($data[0] == "\n") {
748 $data = substr($data,1);
750 $this->checkpointJustWritten
= false;
752 $this->buffer
.= htmlspecialchars( $data );
755 function clearOpenElement( $style ) {
756 if ( $this->openElement
) {
757 $this->buffer
.= Xml
::element( $this->openElement
[0], $this->openElement
[1], $style );
758 $this->openElement
= false;
764 $dumper = new TextPassDumper( $argv );
766 if ( !isset( $options['help'] ) ) {
767 $dumper->dump( true );
769 $dumper->progress( <<<ENDS
770 This script postprocesses XML dumps from dumpBackup.php to add
771 page text which was stubbed out (using --stub).
773 XML input is accepted on stdin.
774 XML output is sent to stdout; progress reports are sent to stderr.
776 Usage: php dumpTextPass.php [<options>]
778 --stub=<type>:<file> To load a compressed stub dump instead of stdin
779 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
780 pressure on the database.
781 (Requires the XMLReader extension)
782 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
783 out complete page, closing xml file properly, and opening new one
784 with header). This option requires the checkpointfile option.
785 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
786 substituting first pageid written for the first %s (required) and the
787 last pageid written for the second %s if it exists.
788 --quiet Don't dump status reports to stderr.
789 --report=n Report position and speed after every n pages processed.
791 --server=h Force reading from MySQL server h
792 --current Base ETA on number of pages in database instead of all revisions
793 --spawn Spawn a subprocess for loading text records
794 --help Display this help message