3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
24 * @ingroup Maintenance
27 $originalDir = getcwd();
29 require_once( dirname( __FILE__
) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
33 * @ingroup Maintenance
35 class TextPassDumper
extends BackupDumper
{
37 var $input = "php://stdin";
38 var $history = WikiExporter
::FULL
;
40 var $prefetchCount = 0;
41 var $prefetchCountLast = 0;
42 var $fetchCountLast = 0;
46 var $failedTextRetrievals = 0;
47 var $maxConsecutiveFailedTextRetrievals = 200;
48 var $failureTimeout = 5; // Seconds to sleep after db failure
52 var $spawnProc = false;
53 var $spawnWrite = false;
54 var $spawnRead = false;
55 var $spawnErr = false;
57 var $xmlwriterobj = false;
59 // when we spend more than maxTimeAllowed seconds on this run, we continue
60 // processing until we write out the next complete page, then save output file(s),
61 // rename it/them and open new one(s)
62 var $maxTimeAllowed = 0; // 0 = no limit
63 var $timeExceeded = false;
64 var $firstPageWritten = false;
65 var $lastPageWritten = false;
66 var $checkpointJustWritten = false;
67 var $checkpointFiles = array();
69 function initProgress( $history ) {
70 parent
::initProgress();
71 $this->timeOfCheckpoint
= $this->startTime
;
74 function dump( $history, $text = WikiExporter
::TEXT
) {
75 // This shouldn't happen if on console... ;)
76 header( 'Content-type: text/html; charset=UTF-8' );
78 // Notice messages will foul up your XML output even if they're
79 // relatively harmless.
80 if ( ini_get( 'display_errors' ) )
81 ini_set( 'display_errors', 'stderr' );
83 $this->initProgress( $this->history
);
85 $this->db
= $this->backupDb();
87 $this->egress
= new ExportProgressFilter( $this->sink
, $this );
89 // it would be nice to do it in the constructor, oh well. need egress set
90 $this->finalOptionCheck();
92 // we only want this so we know how to close a stream :-P
93 $this->xmlwriterobj
= new XmlDumpWriter();
95 $input = fopen( $this->input
, "rt" );
96 $result = $this->readDump( $input );
98 if ( WikiError
::isError( $result ) ) {
99 throw new MWException( $result->getMessage() );
102 if ( $this->spawnProc
) {
106 $this->report( true );
109 function processOption( $opt, $val, $param ) {
111 $url = $this->processFileOpt( $val, $param );
115 require_once "$IP/maintenance/backupPrefetch.inc";
116 $this->prefetch
= new BaseDump( $url );
122 $this->maxTimeAllowed
= intval($val)*60;
124 case 'checkpointfile':
125 $this->checkpointFiles
[] = $val;
128 $this->history
= WikiExporter
::CURRENT
;
131 $this->history
= WikiExporter
::FULL
;
142 function processFileOpt( $val, $param ) {
143 $fileURIs = explode(';',$param);
144 foreach ( $fileURIs as $URI ) {
150 $newURI = "compress.zlib://$URI";
153 $newURI = "compress.bzip2://$URI";
156 $newURI = "mediawiki.compress.7z://$URI";
161 $newFileURIs[] = $newURI;
163 $val = implode( ';', $newFileURIs );
168 * Overridden to include prefetch ratio if enabled.
170 function showReport() {
171 if ( !$this->prefetch
) {
172 return parent
::showReport();
175 if ( $this->reporting
) {
176 $now = wfTimestamp( TS_DB
);
178 $deltaAll = wfTime() - $this->startTime
;
179 $deltaPart = wfTime() - $this->lastTime
;
180 $this->pageCountPart
= $this->pageCount
- $this->pageCountLast
;
181 $this->revCountPart
= $this->revCount
- $this->revCountLast
;
184 $portion = $this->revCount
/ $this->maxCount
;
185 $eta = $this->startTime +
$deltaAll / $portion;
186 $etats = wfTimestamp( TS_DB
, intval( $eta ) );
187 if ( $this->fetchCount
) {
188 $fetchRate = 100.0 * $this->prefetchCount
/ $this->fetchCount
;
193 $pageRate = $this->pageCount
/ $deltaAll;
194 $revRate = $this->revCount
/ $deltaAll;
202 if ( $this->fetchCountLast
) {
203 $fetchRatePart = 100.0 * $this->prefetchCountLast
/ $this->fetchCountLast
;
206 $fetchRatePart = '-';
208 $pageRatePart = $this->pageCountPart
/ $deltaPart;
209 $revRatePart = $this->revCountPart
/ $deltaPart;
212 $fetchRatePart = '-';
216 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",
217 $now, wfWikiID(), $this->ID
, $this->pageCount
, $pageRate, $pageRatePart, $this->revCount
, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount
) );
218 $this->lastTime
= $nowts;
219 $this->revCountLast
= $this->revCount
;
220 $this->prefetchCountLast
= $this->prefetchCount
;
221 $this->fetchCountLast
= $this->fetchCount
;
225 function setTimeExceeded() {
226 $this->timeExceeded
= True;
229 function checkIfTimeExceeded() {
230 if ( $this->maxTimeAllowed
&& ( $this->lastTime
- $this->timeOfCheckpoint
> $this->maxTimeAllowed
) ) {
236 function finalOptionCheck() {
237 if ( ( $this->checkpointFiles
&& ! $this->maxTimeAllowed
) ||
238 ( $this->maxTimeAllowed
&& !$this->checkpointFiles
) ) {
239 throw new MWException("Options checkpointfile and maxtime must be specified together.\n");
241 foreach ($this->checkpointFiles
as $checkpointFile) {
242 $count = substr_count ( $checkpointFile,"%s" );
244 throw new MWException("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
248 if ( $this->checkpointFiles
) {
249 $filenameList = (array)$this->egress
->getFilenames();
250 if ( count( $filenameList ) != count( $this->checkpointFiles
) ) {
251 throw new MWException("One checkpointfile must be specified for each output option, if maxtime is used.\n");
256 function readDump( $input ) {
258 $this->openElement
= false;
259 $this->atStart
= true;
261 $this->lastName
= "";
265 $parser = xml_parser_create( "UTF-8" );
266 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING
, false );
268 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
269 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
271 $offset = 0; // for context extraction on error reporting
272 $bufferSize = 512 * 1024;
274 if ($this->checkIfTimeExceeded()) {
275 $this->setTimeExceeded();
277 $chunk = fread( $input, $bufferSize );
278 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
279 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
280 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
282 $offset +
= strlen( $chunk );
283 } while ( $chunk !== false && !feof( $input ) );
284 if ($this->maxTimeAllowed
) {
285 $filenameList = (array)$this->egress
->getFilenames();
286 // we wrote some stuff after last checkpoint that needs renamed
287 if (file_exists($filenameList[0])) {
288 $newFilenames = array();
289 # we might have just written the header and footer and had no
290 # pages or revisions written... perhaps they were all deleted
291 # there's no pageID 0 so we use that. the caller is responsible
292 # for deciding what to do with a file containing only the
293 # siteinfo information and the mw tags.
294 if (! $this->firstPageWritten
) {
295 $firstPageID = str_pad(0,9,"0",STR_PAD_LEFT
);
296 $lastPageID = str_pad(0,9,"0",STR_PAD_LEFT
);
299 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
300 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
302 for ( $i = 0; $i < count( $filenameList ); $i++
) {
303 $checkpointNameFilledIn = sprintf( $this->checkpointFiles
[$i], $firstPageID, $lastPageID );
304 $fileinfo = pathinfo($filenameList[$i]);
305 $newFilenames[] = $fileinfo['dirname'] . '/' . $checkpointNameFilledIn;
307 $this->egress
->closeAndRename( $newFilenames );
310 xml_parser_free( $parser );
315 function getText( $id ) {
317 if ( isset( $this->prefetch
) ) {
318 $text = $this->prefetch
->prefetch( $this->thisPage
, $this->thisRev
);
319 if ( $text !== null ) { // Entry missing from prefetch dump
320 $dbr = wfGetDB( DB_SLAVE
);
321 $revID = intval( $this->thisRev
);
322 $revLength = $dbr->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
323 // if length of rev text in file doesn't match length in db, we reload
324 // this avoids carrying forward broken data from previous xml dumps
325 if( strlen( $text ) == $revLength ) {
326 $this->prefetchCount++
;
331 return $this->doGetText( $id );
334 private function doGetText( $id ) {
338 $ex = new MWException( "Graceful storage failure" );
340 if ( $this->spawn
) {
341 if ($this->failures
) {
342 // we don't know why it failed, could be the child process
343 // borked, could be db entry busted, could be db server out to lunch,
344 // so cover all bases
348 $text = $this->getTextSpawned( $id );
350 $text = $this->getTextDbSafe( $id );
352 if ( $text === false ) {
354 if ( $this->failures
> $this->maxFailures
) {
355 $this->progress( "Failed to retrieve revision text for text id ".
356 "$id after $this->maxFailures tries, giving up" );
357 // were there so many bad retrievals in a row we want to bail?
358 // at some point we have to declare the dump irretrievably broken
359 $this->failedTextRetrievals++
;
360 if ($this->failedTextRetrievals
> $this->maxConsecutiveFailedTextRetrievals
) {
364 // would be nice to return something better to the caller someday,
365 // log what we know about the failure and about the revision
369 $this->progress( "Error $this->failures " .
370 "of allowed $this->maxFailures retrieving revision text for text id $id! " .
371 "Pausing $this->failureTimeout seconds before retry..." );
372 sleep( $this->failureTimeout
);
375 $this->failedTextRetrievals
= 0;
383 * Fetch a text revision from the database, retrying in case of failure.
384 * This may survive some transitory errors by reconnecting, but
385 * may not survive a long-term server outage.
387 private function getTextDbSafe( $id ) {
390 $text = $this->getTextDb( $id );
391 } catch ( DBQueryError
$ex ) {
399 * May throw a database error if, say, the server dies during query.
401 private function getTextDb( $id ) {
403 $row = $this->db
->selectRow( 'text',
404 array( 'old_text', 'old_flags' ),
405 array( 'old_id' => $id ),
407 $text = Revision
::getRevisionText( $row );
408 if ( $text === false ) {
411 $stripped = str_replace( "\r", "", $text );
412 $normalized = $wgContLang->normalize( $stripped );
416 private function getTextSpawned( $id ) {
417 wfSuppressWarnings();
418 if ( !$this->spawnProc
) {
422 $text = $this->getTextSpawnedOnce( $id );
427 function openSpawn() {
431 array_map( 'wfEscapeShellArg',
434 "$IP/maintenance/fetchText.php",
435 '--wiki', wfWikiID() ) ) );
437 0 => array( "pipe", "r" ),
438 1 => array( "pipe", "w" ),
439 2 => array( "file", "/dev/null", "a" ) );
442 $this->progress( "Spawning database subprocess: $cmd" );
443 $this->spawnProc
= proc_open( $cmd, $spec, $pipes );
444 if ( !$this->spawnProc
) {
446 $this->progress( "Subprocess spawn failed." );
450 $this->spawnWrite
, // -> stdin
451 $this->spawnRead
, // <- stdout
457 private function closeSpawn() {
458 wfSuppressWarnings();
459 if ( $this->spawnRead
)
460 fclose( $this->spawnRead
);
461 $this->spawnRead
= false;
462 if ( $this->spawnWrite
)
463 fclose( $this->spawnWrite
);
464 $this->spawnWrite
= false;
465 if ( $this->spawnErr
)
466 fclose( $this->spawnErr
);
467 $this->spawnErr
= false;
468 if ( $this->spawnProc
)
469 pclose( $this->spawnProc
);
470 $this->spawnProc
= false;
474 private function getTextSpawnedOnce( $id ) {
477 $ok = fwrite( $this->spawnWrite
, "$id\n" );
478 // $this->progress( ">> $id" );
479 if ( !$ok ) return false;
481 $ok = fflush( $this->spawnWrite
);
482 // $this->progress( ">> [flush]" );
483 if ( !$ok ) return false;
485 // check that the text id they are sending is the one we asked for
486 // this avoids out of sync revision text errors we have encountered in the past
487 $newId = fgets( $this->spawnRead
);
488 if ( $newId === false ) {
491 if ( $id != intval( $newId ) ) {
495 $len = fgets( $this->spawnRead
);
496 // $this->progress( "<< " . trim( $len ) );
497 if ( $len === false ) return false;
499 $nbytes = intval( $len );
500 // actual error, not zero-length text
501 if ($nbytes < 0 ) return false;
505 // Subprocess may not send everything at once, we have to loop.
506 while ( $nbytes > strlen( $text ) ) {
507 $buffer = fread( $this->spawnRead
, $nbytes - strlen( $text ) );
508 if ( $buffer === false ) break;
512 $gotbytes = strlen( $text );
513 if ( $gotbytes != $nbytes ) {
514 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
518 // Do normalization in the dump thread...
519 $stripped = str_replace( "\r", "", $text );
520 $normalized = $wgContLang->normalize( $stripped );
524 function startElement( $parser, $name, $attribs ) {
525 $this->checkpointJustWritten
= false;
527 $this->clearOpenElement( null );
528 $this->lastName
= $name;
530 if ( $name == 'revision' ) {
531 $this->state
= $name;
532 $this->egress
->writeOpenPage( null, $this->buffer
);
534 } elseif ( $name == 'page' ) {
535 $this->state
= $name;
536 if ( $this->atStart
) {
537 $this->egress
->writeOpenStream( $this->buffer
);
539 $this->atStart
= false;
543 if ( $name == "text" && isset( $attribs['id'] ) ) {
544 $text = $this->getText( $attribs['id'] );
545 $this->openElement
= array( $name, array( 'xml:space' => 'preserve' ) );
546 if ( strlen( $text ) > 0 ) {
547 $this->characterData( $parser, $text );
550 $this->openElement
= array( $name, $attribs );
554 function endElement( $parser, $name ) {
555 $this->checkpointJustWritten
= false;
557 if ( $this->openElement
) {
558 $this->clearOpenElement( "" );
560 $this->buffer
.= "</$name>";
563 if ( $name == 'revision' ) {
564 $this->egress
->writeRevision( null, $this->buffer
);
567 } elseif ( $name == 'page' ) {
568 if (! $this->firstPageWritten
) {
569 $this->firstPageWritten
= trim($this->thisPage
);
571 $this->lastPageWritten
= trim($this->thisPage
);
572 if ($this->timeExceeded
) {
573 $this->egress
->writeClosePage( $this->buffer
);
574 // nasty hack, we can't just write the chardata after the
575 // page tag, it will include leading blanks from the next line
576 $this->egress
->sink
->write("\n");
578 $this->buffer
= $this->xmlwriterobj
->closeStream();
579 $this->egress
->writeCloseStream( $this->buffer
);
582 $this->thisPage
= "";
583 // this could be more than one file if we had more than one output arg
584 $checkpointFilenames = array();
585 $filenameList = (array)$this->egress
->getFilenames();
586 $newFilenames = array();
587 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
588 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
589 for ( $i = 0; $i < count( $filenameList ); $i++
) {
590 $checkpointNameFilledIn = sprintf( $this->checkpointFiles
[$i], $firstPageID, $lastPageID );
591 $fileinfo = pathinfo($filenameList[$i]);
592 $newFilenames[] = $fileinfo['dirname'] . '/' . $checkpointNameFilledIn;
594 $this->egress
->closeRenameAndReopen( $newFilenames );
595 $this->buffer
= $this->xmlwriterobj
->openStream();
596 $this->timeExceeded
= false;
597 $this->timeOfCheckpoint
= $this->lastTime
;
598 $this->firstPageWritten
= false;
599 $this->checkpointJustWritten
= true;
602 $this->egress
->writeClosePage( $this->buffer
);
604 $this->thisPage
= "";
607 } elseif ( $name == 'mediawiki' ) {
608 $this->egress
->writeCloseStream( $this->buffer
);
613 function characterData( $parser, $data ) {
614 $this->clearOpenElement( null );
615 if ( $this->lastName
== "id" ) {
616 if ( $this->state
== "revision" ) {
617 $this->thisRev
.= $data;
618 } elseif ( $this->state
== "page" ) {
619 $this->thisPage
.= $data;
622 // have to skip the newline left over from closepagetag line of
623 // end of checkpoint files. nasty hack!!
624 if ($this->checkpointJustWritten
) {
625 if ($data[0] == "\n") {
626 $data = substr($data,1);
628 $this->checkpointJustWritten
= false;
630 $this->buffer
.= htmlspecialchars( $data );
633 function clearOpenElement( $style ) {
634 if ( $this->openElement
) {
635 $this->buffer
.= Xml
::element( $this->openElement
[0], $this->openElement
[1], $style );
636 $this->openElement
= false;
642 $dumper = new TextPassDumper( $argv );
644 if ( !isset( $options['help'] ) ) {
645 $dumper->dump( true );
647 $dumper->progress( <<<ENDS
648 This script postprocesses XML dumps from dumpBackup.php to add
649 page text which was stubbed out (using --stub).
651 XML input is accepted on stdin.
652 XML output is sent to stdout; progress reports are sent to stderr.
654 Usage: php dumpTextPass.php [<options>]
656 --stub=<type>:<file> To load a compressed stub dump instead of stdin
657 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
658 pressure on the database.
659 (Requires the XMLReader extension)
660 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
661 out complete page, closing xml file properly, and opening new one
662 with header). This option requires the checkpointfile option.
663 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
664 substituting first pageid written for the first %s (required) and the
665 last pageid written for the second %s if it exists.
666 --quiet Don't dump status reports to stderr.
667 --report=n Report position and speed after every n pages processed.
669 --server=h Force reading from MySQL server h
670 --current Base ETA on number of pages in database instead of all revisions
671 --spawn Spawn a subprocess for loading text records
672 --help Display this help message