Avoid division by zero
[lhc/web/wiklou.git] / maintenance / storage / recompressTracked.php
1 <?php
2
3 $optionsWithArgs = RecompressTracked::getOptionsWithArgs();
4 require( dirname( __FILE__ ) .'/../commandLine.inc' );
5
6 if ( count( $args ) < 1 ) {
7 echo "Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
8 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable.
9
10 Options:
11 --procs <procs> Set the number of child processes (default 1)
12 --copy-only Copy only, do not update the text table. Restart without this option to complete.
13 --debug-log <file> Log debugging data to the specified file
14 --info-log <file> Log progress messages to the specified file
15 --critical-log <file> Log error messages to the specified file
16 ";
17 exit( 1 );
18 }
19
20 $job = RecompressTracked::newFromCommandLine( $args, $options );
21 $job->execute();
22
23 class RecompressTracked {
24 var $destClusters;
25 var $batchSize = 1000;
26 var $orphanBatchSize = 1000;
27 var $reportingInterval = 10;
28 var $numProcs = 1;
29 var $useDiff, $pageBlobClass, $orphanBlobClass;
30 var $slavePipes, $slaveProcs, $prevSlaveId;
31 var $copyOnly = false;
32 var $isChild = false;
33 var $slaveId = false;
34 var $debugLog, $infoLog, $criticalLog;
35 var $store;
36
37 static $optionsWithArgs = array( 'procs', 'slave-id', 'debug-log', 'info-log', 'critical-log' );
38 static $cmdLineOptionMap = array(
39 'procs' => 'numProcs',
40 'copy-only' => 'copyOnly',
41 'child' => 'isChild',
42 'slave-id' => 'slaveId',
43 'debug-log' => 'debugLog',
44 'info-log' => 'infoLog',
45 'critical-log' => 'criticalLog',
46 );
47
48 static function getOptionsWithArgs() {
49 return self::$optionsWithArgs;
50 }
51
52 static function newFromCommandLine( $args, $options ) {
53 $jobOptions = array( 'destClusters' => $args );
54 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
55 if ( isset( $options[$cmdOption] ) ) {
56 $jobOptions[$classOption] = $options[$cmdOption];
57 }
58 }
59 return new self( $jobOptions );
60 }
61
62 function __construct( $options ) {
63 foreach ( $options as $name => $value ) {
64 $this->$name = $value;
65 }
66 $this->store = new ExternalStoreDB;
67 if ( !$this->isChild ) {
68 $GLOBALS['wgDebugLogPrefix'] = "RCT M: ";
69 } elseif ( $this->slaveId !== false ) {
70 $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->slaveId}: ";
71 }
72 $this->useDiff = function_exists( 'xdiff_string_bdiff' );
73 $this->pageBlobClass = $this->useDiff ? 'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob';
74 $this->orphanBlobClass = 'ConcatenatedGzipHistoryBlob';
75 }
76
77 function debug( $msg ) {
78 wfDebug( "$msg\n" );
79 if ( $this->debugLog ) {
80 $this->logToFile( $msg, $this->debugLog );
81 }
82
83 }
84
85 function info( $msg ) {
86 echo "$msg\n";
87 if ( $this->infoLog ) {
88 $this->logToFile( $msg, $this->infoLog );
89 }
90 }
91
92 function critical( $msg ) {
93 echo "$msg\n";
94 if ( $this->criticalLog ) {
95 $this->logToFile( $msg, $this->criticalLog );
96 }
97 }
98
99 function logToFile( $msg, $file ) {
100 $header = '[' . date('d\TH:i:s') . '] ' . wfHostname() . ' ' . posix_getpid();
101 if ( $this->slaveId !== false ) {
102 $header .= "({$this->slaveId})";
103 }
104 $header .= ' ' . wfWikiID();
105 wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file );
106 }
107
108 /**
109 * Wait until the selected slave has caught up to the master.
110 * This allows us to use the slave for things that were committed in a
111 * previous part of this batch process.
112 */
113 function syncDBs() {
114 $dbw = wfGetDB( DB_MASTER );
115 $dbr = wfGetDB( DB_SLAVE );
116 $pos = $dbw->getMasterPos();
117 $dbr->masterPosWait( $pos, 100000 );
118 }
119
120 /**
121 * Execute parent or child depending on the isChild option
122 */
123 function execute() {
124 if ( $this->isChild ) {
125 $this->executeChild();
126 } else {
127 $this->executeParent();
128 }
129 }
130
131 /**
132 * Execute the parent process
133 */
134 function executeParent() {
135 if ( !$this->checkTrackingTable() ) {
136 return;
137 }
138
139 $this->syncDBs();
140 $this->startSlaveProcs();
141 $this->doAllPages();
142 $this->doAllOrphans();
143 $this->killSlaveProcs();
144 }
145
146 /**
147 * Make sure the tracking table exists and isn't empty
148 */
149 function checkTrackingTable() {
150 $dbr = wfGetDB( DB_SLAVE );
151 if ( !$dbr->tableExists( 'blob_tracking' ) ) {
152 $this->critical( "Error: blob_tracking table does not exist" );
153 return false;
154 }
155 $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ );
156 if ( !$row ) {
157 $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." );
158 return false;
159 }
160 return true;
161 }
162
163 /**
164 * Start the worker processes.
165 * These processes will listen on stdin for commands.
166 * This necessary because text recompression is slow: loading, compressing and
167 * writing are all slow.
168 */
169 function startSlaveProcs() {
170 $cmd = 'php ' . wfEscapeShellArg( __FILE__ );
171 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
172 if ( in_array( $cmdOption, self::$optionsWithArgs ) ) {
173 $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption );
174 } elseif ( $this->$classOption ) {
175 $cmd .= " --$cmdOption";
176 }
177 }
178 $cmd .= ' --child' .
179 ' --wiki ' . wfEscapeShellArg( wfWikiID() ) .
180 ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters );
181
182 $this->slavePipes = $this->slaveProcs = array();
183 for ( $i = 0; $i < $this->numProcs; $i++ ) {
184 $pipes = false;
185 $spec = array(
186 array( 'pipe', 'r' ),
187 array( 'file', '/dev/stdout', 'w' ),
188 array( 'file', '/dev/stderr', 'w' )
189 );
190 wfSuppressWarnings();
191 $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes );
192 wfRestoreWarnings();
193 if ( !$proc ) {
194 $this->critical( "Error opening slave process" );
195 exit( 1 );
196 }
197 $this->slaveProcs[$i] = $proc;
198 $this->slavePipes[$i] = $pipes[0];
199 }
200 $this->prevSlaveId = -1;
201 }
202
203 /**
204 * Gracefully terminate the child processes
205 */
206 function killSlaveProcs() {
207 $this->info( "Waiting for slave processes to finish..." );
208 for ( $i = 0; $i < $this->numProcs; $i++ ) {
209 $this->dispatchToSlave( $i, 'quit' );
210 }
211 for ( $i = 0; $i < $this->numProcs; $i++ ) {
212 $status = proc_close( $this->slaveProcs[$i] );
213 if ( $status ) {
214 $this->critical( "Warning: child #$i exited with status $status" );
215 }
216 }
217 $this->info( "Done." );
218 }
219
220 /**
221 * Dispatch a command to the next available slave.
222 * This may block until a slave finishes its work and becomes available.
223 */
224 function dispatch( /*...*/ ) {
225 $args = func_get_args();
226 $pipes = $this->slavePipes;
227 $numPipes = stream_select( $x=array(), $pipes, $y=array(), 3600 );
228 if ( !$numPipes ) {
229 $this->critical( "Error waiting to write to slaves. Aborting" );
230 exit( 1 );
231 }
232 for ( $i = 0; $i < $this->numProcs; $i++ ) {
233 $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs;
234 if ( isset( $pipes[$slaveId] ) ) {
235 $this->prevSlaveId = $slaveId;
236 $this->dispatchToSlave( $slaveId, $args );
237 return;
238 }
239 }
240 $this->critical( "Unreachable" );
241 exit( 1 );
242 }
243
244 /**
245 * Dispatch a command to a specified slave
246 */
247 function dispatchToSlave( $slaveId, $args ) {
248 $args = (array)$args;
249 $cmd = implode( ' ', $args );
250 fwrite( $this->slavePipes[$slaveId], "$cmd\n" );
251 }
252
253 /**
254 * Move all tracked pages to the new clusters
255 */
256 function doAllPages() {
257 $dbr = wfGetDB( DB_SLAVE );
258 $i = 0;
259 $startId = 0;
260 $numPages = $dbr->selectField( 'blob_tracking',
261 'COUNT(DISTINCT bt_page)',
262 # A condition is required so that this query uses the index
263 array( 'bt_moved' => 0 ),
264 __METHOD__
265 );
266 if ( $this->copyOnly ) {
267 $this->info( "Copying pages..." );
268 } else {
269 $this->info( "Moving pages..." );
270 }
271 while ( true ) {
272 $res = $dbr->select( 'blob_tracking',
273 array( 'bt_page' ),
274 array(
275 'bt_moved' => 0,
276 'bt_page > ' . $dbr->addQuotes( $startId )
277 ),
278 __METHOD__,
279 array(
280 'DISTINCT',
281 'ORDER BY' => 'bt_page',
282 'LIMIT' => $this->batchSize,
283 )
284 );
285 if ( !$res->numRows() ) {
286 break;
287 }
288 foreach ( $res as $row ) {
289 $this->dispatch( 'doPage', $row->bt_page );
290 $i++;
291 }
292 $startId = $row->bt_page;
293 $this->report( 'pages', $i, $numPages );
294 }
295 $this->report( 'pages', $i, $numPages );
296 if ( $this->copyOnly ) {
297 $this->info( "All page copies queued." );
298 } else {
299 $this->info( "All page moves queued." );
300 }
301 }
302
303 /**
304 * Display a progress report
305 */
306 function report( $label, $current, $end ) {
307 $this->numBatches++;
308 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
309 $this->numBatches = 0;
310 $this->info( "$label: $current / $end" );
311 wfWaitForSlaves( 5 );
312 }
313 }
314
315 /**
316 * Move all orphan text to the new clusters
317 */
318 function doAllOrphans() {
319 $dbr = wfGetDB( DB_SLAVE );
320 $startId = 0;
321 $i = 0;
322 $numOrphans = $dbr->selectField( 'blob_tracking',
323 'COUNT(DISTINCT bt_text_id)',
324 array( 'bt_moved' => 0, 'bt_page' => 0 ),
325 __METHOD__ );
326 if ( !$numOrphans ) {
327 return;
328 }
329 if ( $this->copyOnly ) {
330 $this->info( "Copying orphans..." );
331 } else {
332 $this->info( "Moving orphans..." );
333 }
334 $ids = array();
335
336 while ( true ) {
337 $res = $dbr->select( 'blob_tracking',
338 array( 'bt_text_id' ),
339 array(
340 'bt_moved' => 0,
341 'bt_page' => 0,
342 'bt_text_id > ' . $dbr->addQuotes( $startId )
343 ),
344 __METHOD__,
345 array(
346 'DISTINCT',
347 'ORDER BY' => 'bt_text_id',
348 'LIMIT' => $this->batchSize
349 )
350 );
351 if ( !$res->numRows() ) {
352 break;
353 }
354 foreach ( $res as $row ) {
355 $ids[] = $row->bt_text_id;
356 $i++;
357 }
358 // Need to send enough orphan IDs to the child at a time to fill a blob,
359 // so orphanBatchSize needs to be at least ~100.
360 // batchSize can be smaller or larger.
361 while ( count( $ids ) > $this->orphanBatchSize ) {
362 $args = array_slice( $ids, 0, $this->orphanBatchSize );
363 $ids = array_slice( $ids, $this->orphanBatchSize );
364 array_unshift( $args, 'doOrphanList' );
365 call_user_func_array( array( $this, 'dispatch' ), $args );
366 }
367 $startId = $row->bt_text_id;
368 $this->report( 'orphans', $i, $numOrphans );
369 }
370 $this->report( 'orphans', $i, $numOrphans );
371 $this->info( "All orphans queued." );
372 }
373
374 /**
375 * Main entry point for worker processes
376 */
377 function executeChild() {
378 $this->debug( 'starting' );
379 $this->syncDBs();
380
381 while ( !feof( STDIN ) ) {
382 $line = rtrim( fgets( STDIN ) );
383 if ( $line == '' ) {
384 continue;
385 }
386 $this->debug( $line );
387 $args = explode( ' ', $line );
388 $cmd = array_shift( $args );
389 switch ( $cmd ) {
390 case 'doPage':
391 $this->doPage( intval( $args[0] ) );
392 break;
393 case 'doOrphanList':
394 $this->doOrphanList( array_map( 'intval', $args ) );
395 break;
396 case 'quit':
397 return;
398 }
399 }
400 }
401
402 /**
403 * Move tracked text in a given page
404 */
405 function doPage( $pageId ) {
406 $title = Title::newFromId( $pageId );
407 if ( $title ) {
408 $titleText = $title->getPrefixedText();
409 } else {
410 $titleText = '[deleted]';
411 }
412 $dbr = wfGetDB( DB_SLAVE );
413
414 // Finish any incomplete transactions
415 if ( !$this->copyOnly ) {
416 $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) );
417 $this->syncDBs();
418 }
419
420 $startId = 0;
421 $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
422
423 while ( true ) {
424 $res = $dbr->select(
425 array( 'blob_tracking', 'text' ),
426 '*',
427 array(
428 'bt_page' => $pageId,
429 'bt_text_id > ' . $dbr->addQuotes( $startId ),
430 'bt_moved' => 0,
431 'bt_new_url IS NULL',
432 'bt_text_id=old_id',
433 ),
434 __METHOD__,
435 array(
436 'ORDER BY' => 'bt_text_id',
437 'LIMIT' => $this->batchSize
438 )
439 );
440 if ( !$res->numRows() ) {
441 break;
442 }
443
444 $lastTextId = 0;
445 foreach ( $res as $row ) {
446 if ( $lastTextId == $row->bt_text_id ) {
447 // Duplicate (null edit)
448 continue;
449 }
450 $lastTextId = $row->bt_text_id;
451 // Load the text
452 $text = Revision::getRevisionText( $row );
453 if ( $text === false ) {
454 $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
455 continue;
456 }
457
458 // Queue it
459 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
460 $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
461 $trx->commit();
462 $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
463 }
464 }
465 $startId = $row->bt_text_id;
466 }
467
468 $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
469 $trx->commit();
470 }
471
472 /**
473 * Atomic move operation.
474 *
475 * Write the new URL to the text table and set the bt_moved flag.
476 *
477 * This is done in a single transaction to provide restartable behaviour
478 * without data loss.
479 *
480 * The transaction is kept short to reduce locking.
481 */
482 function moveTextRow( $textId, $url ) {
483 if ( $this->copyOnly ) {
484 $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" );
485 exit( 1 );
486 }
487 $dbw = wfGetDB( DB_MASTER );
488 $dbw->begin();
489 $dbw->update( 'text',
490 array( // set
491 'old_text' => $url,
492 'old_flags' => 'external,utf-8',
493 ),
494 array( // where
495 'old_id' => $textId
496 ),
497 __METHOD__
498 );
499 $dbw->update( 'blob_tracking',
500 array( 'bt_moved' => 1 ),
501 array( 'bt_text_id' => $textId ),
502 __METHOD__
503 );
504 $dbw->commit();
505 }
506
507 /**
508 * Moves are done in two phases: bt_new_url and then bt_moved.
509 * - bt_new_url indicates that the text has been copied to the new cluster.
510 * - bt_moved indicates that the text table has been updated.
511 *
512 * This function completes any moves that only have done bt_new_url. This
513 * can happen when the script is interrupted, or when --copy-only is used.
514 */
515 function finishIncompleteMoves( $conds ) {
516 $dbr = wfGetDB( DB_SLAVE );
517
518 $startId = 0;
519 $conds = array_merge( $conds, array(
520 'bt_moved' => 0,
521 'bt_new_url IS NOT NULL'
522 ));
523 while ( true ) {
524 $res = $dbr->select( 'blob_tracking',
525 '*',
526 array_merge( $conds, array( 'bt_text_id > ' . $dbr->addQuotes( $startId ) ) ),
527 __METHOD__,
528 array(
529 'ORDER BY' => 'bt_text_id',
530 'LIMIT' => $this->batchSize,
531 )
532 );
533 if ( !$res->numRows() ) {
534 break;
535 }
536 $this->debug( 'Incomplete: ' . $res->numRows() . ' rows' );
537 foreach ( $res as $row ) {
538 $this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
539 }
540 $startId = $row->bt_text_id;
541 }
542 }
543
544 /**
545 * Returns the name of the next target cluster
546 */
547 function getTargetCluster() {
548 $cluster = next( $this->destClusters );
549 if ( $cluster === false ) {
550 $cluster = reset( $this->destClusters );
551 }
552 return $cluster;
553 }
554
555 /**
556 * Gets a DB master connection for the given external cluster name
557 */
558 function getExtDB( $cluster ) {
559 $lb = wfGetLBFactory()->getExternalLB( $cluster );
560 return $lb->getConnection( DB_MASTER );
561 }
562
563 /**
564 * Move an orphan text_id to the new cluster
565 */
566 function doOrphanList( $textIds ) {
567 // Finish incomplete moves
568 if ( !$this->copyOnly ) {
569 $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) );
570 $this->syncDBs();
571 }
572
573 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
574
575 $res = wfGetDB( DB_SLAVE )->select(
576 array( 'text', 'blob_tracking' ),
577 array( 'old_id', 'old_text', 'old_flags' ),
578 array(
579 'old_id' => $textIds,
580 'bt_text_id=old_id',
581 'bt_moved' => 0,
582 ),
583 __METHOD__,
584 array( 'DISTINCT' )
585 );
586
587 foreach ( $res as $row ) {
588 $text = Revision::getRevisionText( $row );
589 if ( $text === false ) {
590 $this->critical( "Error: cannot load revision text for old_id=$textId" );
591 continue;
592 }
593
594 if ( !$trx->addItem( $text, $row->old_id ) ) {
595 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
596 $trx->commit();
597 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
598 }
599 }
600 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
601 $trx->commit();
602 }
603 }
604
605 /**
606 * Class to represent a recompression operation for a single CGZ blob
607 */
608 class CgzCopyTransaction {
609 var $parent;
610 var $blobClass;
611 var $cgz;
612 var $referrers;
613
614 /**
615 * Create a transaction from a RecompressTracked object
616 */
617 function __construct( $parent, $blobClass ) {
618 $this->blobClass = $blobClass;
619 $this->cgz = false;
620 $this->texts = array();
621 $this->parent = $parent;
622 }
623
624 /**
625 * Add text.
626 * Returns false if it's ready to commit.
627 */
628 function addItem( $text, $textId ) {
629 if ( !$this->cgz ) {
630 $class = $this->blobClass;
631 $this->cgz = new $class;
632 }
633 $hash = $this->cgz->addItem( $text );
634 $this->referrers[$textId] = $hash;
635 $this->texts[$textId] = $text;
636 return $this->cgz->isHappy();
637 }
638
639 function getSize() {
640 return count( $this->texts );
641 }
642
643 /**
644 * Recompress text after some aberrant modification
645 */
646 function recompress() {
647 $class = $this->blobClass;
648 $this->cgz = new $class;
649 $this->referrers = array();
650 foreach ( $this->texts as $textId => $text ) {
651 $hash = $this->cgz->addItem( $text );
652 $this->referrers[$textId] = $hash;
653 }
654 }
655
656 /**
657 * Commit the blob.
658 * Does nothing if no text items have been added.
659 * May skip the move if --copy-only is set.
660 */
661 function commit() {
662 $originalCount = count( $this->texts );
663 if ( !$originalCount ) {
664 return;
665 }
666
667 // Check to see if the target text_ids have been moved already.
668 //
669 // We originally read from the slave, so this can happen when a single
670 // text_id is shared between multiple pages. It's rare, but possible
671 // if a delete/move/undelete cycle splits up a null edit.
672 //
673 // We do a locking read to prevent closer-run race conditions.
674 $dbw = wfGetDB( DB_MASTER );
675 $dbw->begin();
676 $res = $dbw->select( 'blob_tracking',
677 array( 'bt_text_id', 'bt_moved' ),
678 array( 'bt_text_id' => array_keys( $this->referrers ) ),
679 __METHOD__, array( 'FOR UPDATE' ) );
680 $dirty = false;
681 foreach ( $res as $row ) {
682 if ( $row->bt_moved ) {
683 # This row has already been moved, remove it
684 $this->parent->debug( "TRX: conflict detected in old_id={$row->bt_text_id}" );
685 unset( $this->texts[$row->bt_text_id] );
686 $dirty = true;
687 }
688 }
689
690 // Recompress the blob if necessary
691 if ( $dirty ) {
692 if ( !count( $this->texts ) ) {
693 // All have been moved already
694 if ( $originalCount > 1 ) {
695 // This is suspcious, make noise
696 $this->critical( "Warning: concurrent operation detected, are there two conflicting " .
697 "processes running, doing the same job?" );
698 }
699 return;
700 }
701 $this->recompress();
702 }
703
704 // Insert the data into the destination cluster
705 $targetCluster = $this->parent->getTargetCluster();
706 $store = $this->parent->store;
707 $targetDB = $store->getMaster( $targetCluster );
708 $targetDB->clearFlag( DBO_TRX ); // we manage the transactions
709 $targetDB->begin();
710 $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) );
711
712 // Write the new URLs to the blob_tracking table
713 foreach ( $this->referrers as $textId => $hash ) {
714 $url = $baseUrl . '/' . $hash;
715 $dbw->update( 'blob_tracking',
716 array( 'bt_new_url' => $url ),
717 array(
718 'bt_text_id' => $textId,
719 'bt_moved' => 0, # Check for concurrent conflicting update
720 ),
721 __METHOD__
722 );
723 }
724
725 $targetDB->commit();
726 // Critical section here: interruption at this point causes blob duplication
727 // Reversing the order of the commits would cause data loss instead
728 $dbw->commit();
729
730 // Write the new URLs to the text table and set the moved flag
731 if ( !$this->parent->copyOnly ) {
732 foreach ( $this->referrers as $textId => $hash ) {
733 $url = $baseUrl . '/' . $hash;
734 $this->parent->moveTextRow( $textId, $url );
735 }
736 }
737 }
738 }
739