3 * Script that postprocesses XML dumps from dumpBackup.php to add page text
5 * Copyright (C) 2005 Brion Vibber <brion@pobox.com>
6 * http://www.mediawiki.org/
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21 * http://www.gnu.org/copyleft/gpl.html
24 * @ingroup Maintenance
27 $originalDir = getcwd();
29 require_once( dirname( __FILE__
) . '/commandLine.inc' );
30 require_once( 'backup.inc' );
33 * @ingroup Maintenance
35 class TextPassDumper
extends BackupDumper
{
37 var $input = "php://stdin";
38 var $history = WikiExporter
::FULL
;
40 var $prefetchCount = 0;
41 var $prefetchCountLast = 0;
42 var $fetchCountLast = 0;
45 var $maxConsecutiveFailedTextRetrievals = 200;
46 var $failureTimeout = 5; // Seconds to sleep after db failure
50 var $spawnProc = false;
51 var $spawnWrite = false;
52 var $spawnRead = false;
53 var $spawnErr = false;
55 var $xmlwriterobj = false;
57 // when we spend more than maxTimeAllowed seconds on this run, we continue
58 // processing until we write out the next complete page, then save output file(s),
59 // rename it/them and open new one(s)
60 var $maxTimeAllowed = 0; // 0 = no limit
61 var $timeExceeded = false;
62 var $firstPageWritten = false;
63 var $lastPageWritten = false;
64 var $checkpointJustWritten = false;
65 var $checkpointFiles = array();
74 * Drop the database connection $this->db and try to get a new one.
76 * This function tries to get a /different/ connection if this is
77 * possible. Hence, (if this is possible) it switches to a different
78 * failover upon each call.
80 * This function resets $this->lb and closes all connections on it.
85 // Cleaning up old connections
86 if ( isset( $this->lb
) ) {
87 $this->lb
->closeAll();
91 if ( isset( $this->db
) && $this->db
->isOpen() )
93 throw new MWException( 'DB is set and has not been closed by the Load Balancer' );
99 // Trying to set up new connection.
100 // We do /not/ retry upon failure, but delegate to encapsulating logic, to avoid
101 // individually retrying at different layers of code.
103 // 1. The LoadBalancer.
105 $this->lb
= wfGetLBFactory()->newMainLB();
106 } catch (Exception
$e) {
107 throw new MWException( __METHOD__
. " rotating DB failed to obtain new load balancer (" . $e->getMessage() . ")" );
111 // 2. The Connection, through the load balancer.
113 $this->db
= $this->lb
->getConnection( DB_SLAVE
, 'backup' );
114 } catch (Exception
$e) {
115 throw new MWException( __METHOD__
. " rotating DB failed to obtain new database (" . $e->getMessage() . ")" );
118 assert( 'isset( $this->lb ) && isset( $this->db ) && $this->db->isOpen() /* rotating the DB worked */' );
122 function initProgress( $history ) {
123 parent
::initProgress();
124 $this->timeOfCheckpoint
= $this->startTime
;
127 function dump( $history, $text = WikiExporter
::TEXT
) {
128 // This shouldn't happen if on console... ;)
129 header( 'Content-type: text/html; charset=UTF-8' );
131 // Notice messages will foul up your XML output even if they're
132 // relatively harmless.
133 if ( ini_get( 'display_errors' ) )
134 ini_set( 'display_errors', 'stderr' );
136 $this->initProgress( $this->history
);
138 // We are trying to get an initial database connection to avoid that the
139 // first try of this request's first call to getText fails. However, if
140 // obtaining a good DB connection fails it's not a serious issue, as
141 // getText does retry upon failure and can start without having a working
145 } catch (Exception
$e) {
146 // We do not even count this as failure. Just let eventual
148 $this->progress( "Getting initial DB connection failed (" .
149 $e->getMessage() . ")" );
152 $this->egress
= new ExportProgressFilter( $this->sink
, $this );
154 // it would be nice to do it in the constructor, oh well. need egress set
155 $this->finalOptionCheck();
157 // we only want this so we know how to close a stream :-P
158 $this->xmlwriterobj
= new XmlDumpWriter();
160 $input = fopen( $this->input
, "rt" );
161 $result = $this->readDump( $input );
163 if ( WikiError
::isError( $result ) ) {
164 throw new MWException( $result->getMessage() );
167 if ( $this->spawnProc
) {
171 $this->report( true );
174 function processOption( $opt, $val, $param ) {
176 $url = $this->processFileOpt( $val, $param );
180 require_once "$IP/maintenance/backupPrefetch.inc";
181 $this->prefetch
= new BaseDump( $url );
187 $this->maxTimeAllowed
= intval($val)*60;
189 case 'checkpointfile':
190 $this->checkpointFiles
[] = $val;
193 $this->history
= WikiExporter
::CURRENT
;
196 $this->history
= WikiExporter
::FULL
;
207 function processFileOpt( $val, $param ) {
208 $fileURIs = explode(';',$param);
209 foreach ( $fileURIs as $URI ) {
215 $newURI = "compress.zlib://$URI";
218 $newURI = "compress.bzip2://$URI";
221 $newURI = "mediawiki.compress.7z://$URI";
226 $newFileURIs[] = $newURI;
228 $val = implode( ';', $newFileURIs );
233 * Overridden to include prefetch ratio if enabled.
235 function showReport() {
236 if ( !$this->prefetch
) {
237 parent
::showReport();
241 if ( $this->reporting
) {
242 $now = wfTimestamp( TS_DB
);
244 $deltaAll = wfTime() - $this->startTime
;
245 $deltaPart = wfTime() - $this->lastTime
;
246 $this->pageCountPart
= $this->pageCount
- $this->pageCountLast
;
247 $this->revCountPart
= $this->revCount
- $this->revCountLast
;
250 $portion = $this->revCount
/ $this->maxCount
;
251 $eta = $this->startTime +
$deltaAll / $portion;
252 $etats = wfTimestamp( TS_DB
, intval( $eta ) );
253 if ( $this->fetchCount
) {
254 $fetchRate = 100.0 * $this->prefetchCount
/ $this->fetchCount
;
258 $pageRate = $this->pageCount
/ $deltaAll;
259 $revRate = $this->revCount
/ $deltaAll;
267 if ( $this->fetchCountLast
) {
268 $fetchRatePart = 100.0 * $this->prefetchCountLast
/ $this->fetchCountLast
;
270 $fetchRatePart = '-';
272 $pageRatePart = $this->pageCountPart
/ $deltaPart;
273 $revRatePart = $this->revCountPart
/ $deltaPart;
276 $fetchRatePart = '-';
280 $this->progress( sprintf( "%s: %s (ID %d) %d pages (%0.1f|%0.1f/sec all|curr), %d revs (%0.1f|%0.1f/sec all|curr), %0.1f%%|%0.1f%% prefetched (all|curr), ETA %s [max %d]",
281 $now, wfWikiID(), $this->ID
, $this->pageCount
, $pageRate, $pageRatePart, $this->revCount
, $revRate, $revRatePart, $fetchRate, $fetchRatePart, $etats, $this->maxCount
) );
282 $this->lastTime
= $nowts;
283 $this->revCountLast
= $this->revCount
;
284 $this->prefetchCountLast
= $this->prefetchCount
;
285 $this->fetchCountLast
= $this->fetchCount
;
289 function setTimeExceeded() {
290 $this->timeExceeded
= True;
293 function checkIfTimeExceeded() {
294 if ( $this->maxTimeAllowed
&& ( $this->lastTime
- $this->timeOfCheckpoint
> $this->maxTimeAllowed
) ) {
300 function finalOptionCheck() {
301 if ( ( $this->checkpointFiles
&& ! $this->maxTimeAllowed
) ||
302 ( $this->maxTimeAllowed
&& !$this->checkpointFiles
) ) {
303 throw new MWException("Options checkpointfile and maxtime must be specified together.\n");
305 foreach ($this->checkpointFiles
as $checkpointFile) {
306 $count = substr_count ( $checkpointFile,"%s" );
308 throw new MWException("Option checkpointfile must contain two '%s' for substitution of first and last pageids, count is $count instead, file is $checkpointFile.\n");
312 if ( $this->checkpointFiles
) {
313 $filenameList = (array)$this->egress
->getFilenames();
314 if ( count( $filenameList ) != count( $this->checkpointFiles
) ) {
315 throw new MWException("One checkpointfile must be specified for each output option, if maxtime is used.\n");
320 function readDump( $input ) {
322 $this->openElement
= false;
323 $this->atStart
= true;
325 $this->lastName
= "";
329 $parser = xml_parser_create( "UTF-8" );
330 xml_parser_set_option( $parser, XML_OPTION_CASE_FOLDING
, false );
332 xml_set_element_handler( $parser, array( &$this, 'startElement' ), array( &$this, 'endElement' ) );
333 xml_set_character_data_handler( $parser, array( &$this, 'characterData' ) );
335 $offset = 0; // for context extraction on error reporting
336 $bufferSize = 512 * 1024;
338 if ($this->checkIfTimeExceeded()) {
339 $this->setTimeExceeded();
341 $chunk = fread( $input, $bufferSize );
342 if ( !xml_parse( $parser, $chunk, feof( $input ) ) ) {
343 wfDebug( "TextDumpPass::readDump encountered XML parsing error\n" );
344 return new WikiXmlError( $parser, 'XML import parse failure', $chunk, $offset );
346 $offset +
= strlen( $chunk );
347 } while ( $chunk !== false && !feof( $input ) );
348 if ($this->maxTimeAllowed
) {
349 $filenameList = (array)$this->egress
->getFilenames();
350 // we wrote some stuff after last checkpoint that needs renamed
351 if (file_exists($filenameList[0])) {
352 $newFilenames = array();
353 # we might have just written the header and footer and had no
354 # pages or revisions written... perhaps they were all deleted
355 # there's no pageID 0 so we use that. the caller is responsible
356 # for deciding what to do with a file containing only the
357 # siteinfo information and the mw tags.
358 if (! $this->firstPageWritten
) {
359 $firstPageID = str_pad(0,9,"0",STR_PAD_LEFT
);
360 $lastPageID = str_pad(0,9,"0",STR_PAD_LEFT
);
363 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
364 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
366 for ( $i = 0; $i < count( $filenameList ); $i++
) {
367 $checkpointNameFilledIn = sprintf( $this->checkpointFiles
[$i], $firstPageID, $lastPageID );
368 $fileinfo = pathinfo($filenameList[$i]);
369 $newFilenames[] = $fileinfo['dirname'] . '/' . $checkpointNameFilledIn;
371 $this->egress
->closeAndRename( $newFilenames );
374 xml_parser_free( $parser );
380 * Tries to get the revision text for a revision id.
382 * Upon errors, retries (Up to $this->maxFailures tries each call).
383 * If still no good revision get could be found even after this retrying, "" is returned.
384 * If no good revision text could be returned for
385 * $this->maxConsecutiveFailedTextRetrievals consecutive calls to getText, MWException
388 * @param $id string The revision id to get the text for
390 * @return string The revision text for $id, or ""
391 * @throws MWException
393 function getText( $id ) {
394 $prefetchNotTried = true; // Whether or not we already tried to get the text via prefetch.
395 $text = false; // The candidate for a good text. false if no proper value.
396 $failures = 0; // The number of times, this invocation of getText already failed.
398 static $consecutiveFailedTextRetrievals = 0; // The number of times getText failed without
399 // yielding a good text in between.
403 // To allow to simply return on success and do not have to worry about book keeping,
404 // we assume, this fetch works (possible after some retries). Nevertheless, we koop
405 // the old value, so we can restore it, if problems occur (See after the while loop).
406 $oldConsecutiveFailedTextRetrievals = $consecutiveFailedTextRetrievals;
407 $consecutiveFailedTextRetrievals = 0;
409 while ( $failures < $this->maxFailures
) {
411 // As soon as we found a good text for the $id, we will return immediately.
412 // Hence, if we make it past the try catch block, we know that we did not
416 // Step 1: Get some text (or reuse from previous iteratuon if checking
417 // for plausibility failed)
419 // Trying to get prefetch, if it has not been tried before
420 if ( $text === false && isset( $this->prefetch
) && $prefetchNotTried ) {
421 $prefetchNotTried = false;
422 $tryIsPrefetch = true;
423 $text = $this->prefetch
->prefetch( $this->thisPage
, $this->thisRev
);
424 if ( $text === null ) {
429 if ( $text === false ) {
430 // Fallback to asking the database
431 $tryIsPrefetch = false;
432 if ( $this->spawn
) {
433 $text = $this->getTextSpawned( $id );
435 $text = $this->getTextDb( $id );
439 if ( $text === false ) {
440 throw new MWException( "Generic error while obtaining text for id " . $id );
443 // We received a good candidate for the text of $id via some method
445 // Step 2: Checking for plausibility and return the text if it is
447 $revID = intval( $this->thisRev
);
448 if ( ! isset( $this->db
) ) {
449 throw new MWException( "No database available" );
451 $revLength = $this->db
->selectField( 'revision', 'rev_len', array( 'rev_id' => $revID ) );
452 if( strlen( $text ) == $revLength ) {
453 if ( $tryIsPrefetch ) {
454 $this->prefetchCount++
;
460 throw new MWException( "Received text is unplausible for id " . $id );
462 } catch (Exception
$e) {
463 $msg = "getting/checking text " . $id . " failed (".$e->getMessage().")";
464 if ( $failures +
1 < $this->maxFailures
) {
465 $msg .= " (Will retry " . ( $this->maxFailures
- $failures - 1) . " more times)";
467 $this->progress( $msg );
470 // Something went wrong; we did not a text that was plausible :(
474 // After backing off for some time, we try to reboot the whole process as
475 // much as possible to not carry over failures from one part to the other
477 sleep( $this->failureTimeout
);
480 if ( $this->spawn
) {
484 } catch (Exception
$e) {
485 $this->progress( "Rebooting getText infrastructure failed (".$e->getMessage().")" .
486 " Trying to continue anyways" );
490 // Retirieving a good text for $id failed (at least) maxFailures times.
491 // We abort for this $id.
493 // Restoring the consecutive failures, and maybe aborting, if the dump
495 $consecutiveFailedTextRetrievals = $oldConsecutiveFailedTextRetrievals +
1;
496 if ( $consecutiveFailedTextRetrievals > $this->maxConsecutiveFailedTextRetrievals
) {
497 throw new MWException( "Graceful storage failure" );
505 * May throw a database error if, say, the server dies during query.
507 * @return bool|string
508 * @throws MWException
510 private function getTextDb( $id ) {
512 if ( ! isset( $this->db
) ) {
513 throw new MWException( __METHOD__
. "No database available" );
515 $row = $this->db
->selectRow( 'text',
516 array( 'old_text', 'old_flags' ),
517 array( 'old_id' => $id ),
519 $text = Revision
::getRevisionText( $row );
520 if ( $text === false ) {
523 $stripped = str_replace( "\r", "", $text );
524 $normalized = $wgContLang->normalize( $stripped );
528 private function getTextSpawned( $id ) {
529 wfSuppressWarnings();
530 if ( !$this->spawnProc
) {
534 $text = $this->getTextSpawnedOnce( $id );
539 function openSpawn() {
542 if ( file_exists( "$IP/../multiversion/MWScript.php" ) ) {
544 array_map( 'wfEscapeShellArg',
547 "$IP/../multiversion/MWScript.php",
549 '--wiki', wfWikiID() ) ) );
553 array_map( 'wfEscapeShellArg',
556 "$IP/maintenance/fetchText.php",
557 '--wiki', wfWikiID() ) ) );
560 0 => array( "pipe", "r" ),
561 1 => array( "pipe", "w" ),
562 2 => array( "file", "/dev/null", "a" ) );
565 $this->progress( "Spawning database subprocess: $cmd" );
566 $this->spawnProc
= proc_open( $cmd, $spec, $pipes );
567 if ( !$this->spawnProc
) {
569 $this->progress( "Subprocess spawn failed." );
573 $this->spawnWrite
, // -> stdin
574 $this->spawnRead
, // <- stdout
580 private function closeSpawn() {
581 wfSuppressWarnings();
582 if ( $this->spawnRead
)
583 fclose( $this->spawnRead
);
584 $this->spawnRead
= false;
585 if ( $this->spawnWrite
)
586 fclose( $this->spawnWrite
);
587 $this->spawnWrite
= false;
588 if ( $this->spawnErr
)
589 fclose( $this->spawnErr
);
590 $this->spawnErr
= false;
591 if ( $this->spawnProc
)
592 pclose( $this->spawnProc
);
593 $this->spawnProc
= false;
597 private function getTextSpawnedOnce( $id ) {
600 $ok = fwrite( $this->spawnWrite
, "$id\n" );
601 // $this->progress( ">> $id" );
602 if ( !$ok ) return false;
604 $ok = fflush( $this->spawnWrite
);
605 // $this->progress( ">> [flush]" );
606 if ( !$ok ) return false;
608 // check that the text id they are sending is the one we asked for
609 // this avoids out of sync revision text errors we have encountered in the past
610 $newId = fgets( $this->spawnRead
);
611 if ( $newId === false ) {
614 if ( $id != intval( $newId ) ) {
618 $len = fgets( $this->spawnRead
);
619 // $this->progress( "<< " . trim( $len ) );
620 if ( $len === false ) return false;
622 $nbytes = intval( $len );
623 // actual error, not zero-length text
624 if ($nbytes < 0 ) return false;
628 // Subprocess may not send everything at once, we have to loop.
629 while ( $nbytes > strlen( $text ) ) {
630 $buffer = fread( $this->spawnRead
, $nbytes - strlen( $text ) );
631 if ( $buffer === false ) break;
635 $gotbytes = strlen( $text );
636 if ( $gotbytes != $nbytes ) {
637 $this->progress( "Expected $nbytes bytes from database subprocess, got $gotbytes " );
641 // Do normalization in the dump thread...
642 $stripped = str_replace( "\r", "", $text );
643 $normalized = $wgContLang->normalize( $stripped );
647 function startElement( $parser, $name, $attribs ) {
648 $this->checkpointJustWritten
= false;
650 $this->clearOpenElement( null );
651 $this->lastName
= $name;
653 if ( $name == 'revision' ) {
654 $this->state
= $name;
655 $this->egress
->writeOpenPage( null, $this->buffer
);
657 } elseif ( $name == 'page' ) {
658 $this->state
= $name;
659 if ( $this->atStart
) {
660 $this->egress
->writeOpenStream( $this->buffer
);
662 $this->atStart
= false;
666 if ( $name == "text" && isset( $attribs['id'] ) ) {
667 $text = $this->getText( $attribs['id'] );
668 $this->openElement
= array( $name, array( 'xml:space' => 'preserve' ) );
669 if ( strlen( $text ) > 0 ) {
670 $this->characterData( $parser, $text );
673 $this->openElement
= array( $name, $attribs );
677 function endElement( $parser, $name ) {
678 $this->checkpointJustWritten
= false;
680 if ( $this->openElement
) {
681 $this->clearOpenElement( "" );
683 $this->buffer
.= "</$name>";
686 if ( $name == 'revision' ) {
687 $this->egress
->writeRevision( null, $this->buffer
);
690 } elseif ( $name == 'page' ) {
691 if (! $this->firstPageWritten
) {
692 $this->firstPageWritten
= trim($this->thisPage
);
694 $this->lastPageWritten
= trim($this->thisPage
);
695 if ($this->timeExceeded
) {
696 $this->egress
->writeClosePage( $this->buffer
);
697 // nasty hack, we can't just write the chardata after the
698 // page tag, it will include leading blanks from the next line
699 $this->egress
->sink
->write("\n");
701 $this->buffer
= $this->xmlwriterobj
->closeStream();
702 $this->egress
->writeCloseStream( $this->buffer
);
705 $this->thisPage
= "";
706 // this could be more than one file if we had more than one output arg
708 $filenameList = (array)$this->egress
->getFilenames();
709 $newFilenames = array();
710 $firstPageID = str_pad($this->firstPageWritten
,9,"0",STR_PAD_LEFT
);
711 $lastPageID = str_pad($this->lastPageWritten
,9,"0",STR_PAD_LEFT
);
712 for ( $i = 0; $i < count( $filenameList ); $i++
) {
713 $checkpointNameFilledIn = sprintf( $this->checkpointFiles
[$i], $firstPageID, $lastPageID );
714 $fileinfo = pathinfo($filenameList[$i]);
715 $newFilenames[] = $fileinfo['dirname'] . '/' . $checkpointNameFilledIn;
717 $this->egress
->closeRenameAndReopen( $newFilenames );
718 $this->buffer
= $this->xmlwriterobj
->openStream();
719 $this->timeExceeded
= false;
720 $this->timeOfCheckpoint
= $this->lastTime
;
721 $this->firstPageWritten
= false;
722 $this->checkpointJustWritten
= true;
725 $this->egress
->writeClosePage( $this->buffer
);
727 $this->thisPage
= "";
730 } elseif ( $name == 'mediawiki' ) {
731 $this->egress
->writeCloseStream( $this->buffer
);
736 function characterData( $parser, $data ) {
737 $this->clearOpenElement( null );
738 if ( $this->lastName
== "id" ) {
739 if ( $this->state
== "revision" ) {
740 $this->thisRev
.= $data;
741 } elseif ( $this->state
== "page" ) {
742 $this->thisPage
.= $data;
745 // have to skip the newline left over from closepagetag line of
746 // end of checkpoint files. nasty hack!!
747 if ($this->checkpointJustWritten
) {
748 if ($data[0] == "\n") {
749 $data = substr($data,1);
751 $this->checkpointJustWritten
= false;
753 $this->buffer
.= htmlspecialchars( $data );
756 function clearOpenElement( $style ) {
757 if ( $this->openElement
) {
758 $this->buffer
.= Xml
::element( $this->openElement
[0], $this->openElement
[1], $style );
759 $this->openElement
= false;
765 $dumper = new TextPassDumper( $argv );
767 if ( !isset( $options['help'] ) ) {
768 $dumper->dump( true );
770 $dumper->progress( <<<ENDS
771 This script postprocesses XML dumps from dumpBackup.php to add
772 page text which was stubbed out (using --stub).
774 XML input is accepted on stdin.
775 XML output is sent to stdout; progress reports are sent to stderr.
777 Usage: php dumpTextPass.php [<options>]
779 --stub=<type>:<file> To load a compressed stub dump instead of stdin
780 --prefetch=<type>:<file> Use a prior dump file as a text source, to save
781 pressure on the database.
782 (Requires the XMLReader extension)
783 --maxtime=<minutes> Write out checkpoint file after this many minutes (writing
784 out complete page, closing xml file properly, and opening new one
785 with header). This option requires the checkpointfile option.
786 --checkpointfile=<filenamepattern> Use this string for checkpoint filenames,
787 substituting first pageid written for the first %s (required) and the
788 last pageid written for the second %s if it exists.
789 --quiet Don't dump status reports to stderr.
790 --report=n Report position and speed after every n pages processed.
792 --server=h Force reading from MySQL server h
793 --current Base ETA on number of pages in database instead of all revisions
794 --spawn Spawn a subprocess for loading text records
795 --help Display this help message