Fixed a bug where the last batch of orphans would be skipped.
[lhc/web/wiklou.git] / maintenance / storage / recompressTracked.php
1 <?php
2
3 $optionsWithArgs = RecompressTracked::getOptionsWithArgs();
4 require( dirname( __FILE__ ) .'/../commandLine.inc' );
5
6 if ( count( $args ) < 1 ) {
7 echo "Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
8 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters, and recompresses them in the process. Restartable.
9
10 Options:
11 --procs <procs> Set the number of child processes (default 1)
12 --copy-only Copy only, do not update the text table. Restart without this option to complete.
13 --debug-log <file> Log debugging data to the specified file
14 --info-log <file> Log progress messages to the specified file
15 --critical-log <file> Log error messages to the specified file
16 ";
17 exit( 1 );
18 }
19
20 $job = RecompressTracked::newFromCommandLine( $args, $options );
21 $job->execute();
22
23 class RecompressTracked {
24 var $destClusters;
25 var $batchSize = 1000;
26 var $orphanBatchSize = 1000;
27 var $reportingInterval = 10;
28 var $numProcs = 1;
29 var $useDiff, $pageBlobClass, $orphanBlobClass;
30 var $slavePipes, $slaveProcs, $prevSlaveId;
31 var $copyOnly = false;
32 var $isChild = false;
33 var $slaveId = false;
34 var $debugLog, $infoLog, $criticalLog;
35 var $store;
36
37 static $optionsWithArgs = array( 'procs', 'slave-id', 'debug-log', 'info-log', 'critical-log' );
38 static $cmdLineOptionMap = array(
39 'procs' => 'numProcs',
40 'copy-only' => 'copyOnly',
41 'child' => 'isChild',
42 'slave-id' => 'slaveId',
43 'debug-log' => 'debugLog',
44 'info-log' => 'infoLog',
45 'critical-log' => 'criticalLog',
46 );
47
48 static function getOptionsWithArgs() {
49 return self::$optionsWithArgs;
50 }
51
52 static function newFromCommandLine( $args, $options ) {
53 $jobOptions = array( 'destClusters' => $args );
54 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
55 if ( isset( $options[$cmdOption] ) ) {
56 $jobOptions[$classOption] = $options[$cmdOption];
57 }
58 }
59 return new self( $jobOptions );
60 }
61
62 function __construct( $options ) {
63 foreach ( $options as $name => $value ) {
64 $this->$name = $value;
65 }
66 $this->store = new ExternalStoreDB;
67 if ( !$this->isChild ) {
68 $GLOBALS['wgDebugLogPrefix'] = "RCT M: ";
69 } elseif ( $this->slaveId !== false ) {
70 $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->slaveId}: ";
71 }
72 $this->useDiff = function_exists( 'xdiff_string_bdiff' );
73 $this->pageBlobClass = $this->useDiff ? 'DiffHistoryBlob' : 'ConcatenatedGzipHistoryBlob';
74 $this->orphanBlobClass = 'ConcatenatedGzipHistoryBlob';
75 }
76
77 function debug( $msg ) {
78 wfDebug( "$msg\n" );
79 if ( $this->debugLog ) {
80 $this->logToFile( $msg, $this->debugLog );
81 }
82
83 }
84
85 function info( $msg ) {
86 echo "$msg\n";
87 if ( $this->infoLog ) {
88 $this->logToFile( $msg, $this->infoLog );
89 }
90 }
91
92 function critical( $msg ) {
93 echo "$msg\n";
94 if ( $this->criticalLog ) {
95 $this->logToFile( $msg, $this->criticalLog );
96 }
97 }
98
99 function logToFile( $msg, $file ) {
100 $header = '[' . date('d\TH:i:s') . '] ' . wfHostname() . ' ' . posix_getpid();
101 if ( $this->slaveId !== false ) {
102 $header .= "({$this->slaveId})";
103 }
104 $header .= ' ' . wfWikiID();
105 wfErrorLog( sprintf( "%-50s %s\n", $header, $msg ), $file );
106 }
107
108 /**
109 * Wait until the selected slave has caught up to the master.
110 * This allows us to use the slave for things that were committed in a
111 * previous part of this batch process.
112 */
113 function syncDBs() {
114 $dbw = wfGetDB( DB_MASTER );
115 $dbr = wfGetDB( DB_SLAVE );
116 $pos = $dbw->getMasterPos();
117 $dbr->masterPosWait( $pos, 100000 );
118 }
119
120 /**
121 * Execute parent or child depending on the isChild option
122 */
123 function execute() {
124 if ( $this->isChild ) {
125 $this->executeChild();
126 } else {
127 $this->executeParent();
128 }
129 }
130
131 /**
132 * Execute the parent process
133 */
134 function executeParent() {
135 if ( !$this->checkTrackingTable() ) {
136 return;
137 }
138
139 $this->syncDBs();
140 $this->startSlaveProcs();
141 $this->doAllPages();
142 $this->doAllOrphans();
143 $this->killSlaveProcs();
144 }
145
146 /**
147 * Make sure the tracking table exists and isn't empty
148 */
149 function checkTrackingTable() {
150 $dbr = wfGetDB( DB_SLAVE );
151 if ( !$dbr->tableExists( 'blob_tracking' ) ) {
152 $this->critical( "Error: blob_tracking table does not exist" );
153 return false;
154 }
155 $row = $dbr->selectRow( 'blob_tracking', '*', false, __METHOD__ );
156 if ( !$row ) {
157 $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." );
158 return false;
159 }
160 return true;
161 }
162
163 /**
164 * Start the worker processes.
165 * These processes will listen on stdin for commands.
166 * This necessary because text recompression is slow: loading, compressing and
167 * writing are all slow.
168 */
169 function startSlaveProcs() {
170 $cmd = 'php ' . wfEscapeShellArg( __FILE__ );
171 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
172 if ( $cmdOption == 'slave-id' ) {
173 continue;
174 } elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
175 $cmd .= " --$cmdOption " . wfEscapeShellArg( $this->$classOption );
176 } elseif ( $this->$classOption ) {
177 $cmd .= " --$cmdOption";
178 }
179 }
180 $cmd .= ' --child' .
181 ' --wiki ' . wfEscapeShellArg( wfWikiID() ) .
182 ' ' . call_user_func_array( 'wfEscapeShellArg', $this->destClusters );
183
184 $this->slavePipes = $this->slaveProcs = array();
185 for ( $i = 0; $i < $this->numProcs; $i++ ) {
186 $pipes = false;
187 $spec = array(
188 array( 'pipe', 'r' ),
189 array( 'file', 'php://stdout', 'w' ),
190 array( 'file', 'php://stderr', 'w' )
191 );
192 wfSuppressWarnings();
193 $proc = proc_open( "$cmd --slave-id $i", $spec, $pipes );
194 wfRestoreWarnings();
195 if ( !$proc ) {
196 $this->critical( "Error opening slave process: $cmd" );
197 exit( 1 );
198 }
199 $this->slaveProcs[$i] = $proc;
200 $this->slavePipes[$i] = $pipes[0];
201 }
202 $this->prevSlaveId = -1;
203 }
204
205 /**
206 * Gracefully terminate the child processes
207 */
208 function killSlaveProcs() {
209 $this->info( "Waiting for slave processes to finish..." );
210 for ( $i = 0; $i < $this->numProcs; $i++ ) {
211 $this->dispatchToSlave( $i, 'quit' );
212 }
213 for ( $i = 0; $i < $this->numProcs; $i++ ) {
214 $status = proc_close( $this->slaveProcs[$i] );
215 if ( $status ) {
216 $this->critical( "Warning: child #$i exited with status $status" );
217 }
218 }
219 $this->info( "Done." );
220 }
221
222 /**
223 * Dispatch a command to the next available slave.
224 * This may block until a slave finishes its work and becomes available.
225 */
226 function dispatch( /*...*/ ) {
227 $args = func_get_args();
228 $pipes = $this->slavePipes;
229 $numPipes = stream_select( $x=array(), $pipes, $y=array(), 3600 );
230 if ( !$numPipes ) {
231 $this->critical( "Error waiting to write to slaves. Aborting" );
232 exit( 1 );
233 }
234 for ( $i = 0; $i < $this->numProcs; $i++ ) {
235 $slaveId = ( $i + $this->prevSlaveId + 1 ) % $this->numProcs;
236 if ( isset( $pipes[$slaveId] ) ) {
237 $this->prevSlaveId = $slaveId;
238 $this->dispatchToSlave( $slaveId, $args );
239 return;
240 }
241 }
242 $this->critical( "Unreachable" );
243 exit( 1 );
244 }
245
246 /**
247 * Dispatch a command to a specified slave
248 */
249 function dispatchToSlave( $slaveId, $args ) {
250 $args = (array)$args;
251 $cmd = implode( ' ', $args );
252 fwrite( $this->slavePipes[$slaveId], "$cmd\n" );
253 }
254
255 /**
256 * Move all tracked pages to the new clusters
257 */
258 function doAllPages() {
259 $dbr = wfGetDB( DB_SLAVE );
260 $i = 0;
261 $startId = 0;
262 $numPages = $dbr->selectField( 'blob_tracking',
263 'COUNT(DISTINCT bt_page)',
264 # A condition is required so that this query uses the index
265 array( 'bt_moved' => 0 ),
266 __METHOD__
267 );
268 if ( $this->copyOnly ) {
269 $this->info( "Copying pages..." );
270 } else {
271 $this->info( "Moving pages..." );
272 }
273 while ( true ) {
274 $res = $dbr->select( 'blob_tracking',
275 array( 'bt_page' ),
276 array(
277 'bt_moved' => 0,
278 'bt_page > ' . $dbr->addQuotes( $startId )
279 ),
280 __METHOD__,
281 array(
282 'DISTINCT',
283 'ORDER BY' => 'bt_page',
284 'LIMIT' => $this->batchSize,
285 )
286 );
287 if ( !$res->numRows() ) {
288 break;
289 }
290 foreach ( $res as $row ) {
291 $this->dispatch( 'doPage', $row->bt_page );
292 $i++;
293 }
294 $startId = $row->bt_page;
295 $this->report( 'pages', $i, $numPages );
296 }
297 $this->report( 'pages', $i, $numPages );
298 if ( $this->copyOnly ) {
299 $this->info( "All page copies queued." );
300 } else {
301 $this->info( "All page moves queued." );
302 }
303 }
304
305 /**
306 * Display a progress report
307 */
308 function report( $label, $current, $end ) {
309 $this->numBatches++;
310 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
311 $this->numBatches = 0;
312 $this->info( "$label: $current / $end" );
313 wfWaitForSlaves( 5 );
314 }
315 }
316
317 /**
318 * Move all orphan text to the new clusters
319 */
320 function doAllOrphans() {
321 $dbr = wfGetDB( DB_SLAVE );
322 $startId = 0;
323 $i = 0;
324 $numOrphans = $dbr->selectField( 'blob_tracking',
325 'COUNT(DISTINCT bt_text_id)',
326 array( 'bt_moved' => 0, 'bt_page' => 0 ),
327 __METHOD__ );
328 if ( !$numOrphans ) {
329 return;
330 }
331 if ( $this->copyOnly ) {
332 $this->info( "Copying orphans..." );
333 } else {
334 $this->info( "Moving orphans..." );
335 }
336
337 while ( true ) {
338 $res = $dbr->select( 'blob_tracking',
339 array( 'bt_text_id' ),
340 array(
341 'bt_moved' => 0,
342 'bt_page' => 0,
343 'bt_text_id > ' . $dbr->addQuotes( $startId )
344 ),
345 __METHOD__,
346 array(
347 'DISTINCT',
348 'ORDER BY' => 'bt_text_id',
349 'LIMIT' => $this->batchSize
350 )
351 );
352 if ( !$res->numRows() ) {
353 break;
354 }
355 $ids = array();
356 foreach ( $res as $row ) {
357 $ids[] = $row->bt_text_id;
358 $i++;
359 }
360 // Need to send enough orphan IDs to the child at a time to fill a blob,
361 // so orphanBatchSize needs to be at least ~100.
362 // batchSize can be smaller or larger.
363 while ( count( $ids ) > $this->orphanBatchSize ) {
364 $args = array_slice( $ids, 0, $this->orphanBatchSize );
365 $ids = array_slice( $ids, $this->orphanBatchSize );
366 array_unshift( $args, 'doOrphanList' );
367 call_user_func_array( array( $this, 'dispatch' ), $args );
368 }
369 if ( count( $ids ) ) {
370 $args = $ids;
371 array_unshift( $args, 'doOrphanList' );
372 call_user_func_array( array( $this, 'dispatch' ), $args );
373 }
374
375 $startId = $row->bt_text_id;
376 $this->report( 'orphans', $i, $numOrphans );
377 }
378 $this->report( 'orphans', $i, $numOrphans );
379 $this->info( "All orphans queued." );
380 }
381
382 /**
383 * Main entry point for worker processes
384 */
385 function executeChild() {
386 $this->debug( 'starting' );
387 $this->syncDBs();
388
389 while ( !feof( STDIN ) ) {
390 $line = rtrim( fgets( STDIN ) );
391 if ( $line == '' ) {
392 continue;
393 }
394 $this->debug( $line );
395 $args = explode( ' ', $line );
396 $cmd = array_shift( $args );
397 switch ( $cmd ) {
398 case 'doPage':
399 $this->doPage( intval( $args[0] ) );
400 break;
401 case 'doOrphanList':
402 $this->doOrphanList( array_map( 'intval', $args ) );
403 break;
404 case 'quit':
405 return;
406 }
407 wfWaitForSlaves( 5 );
408 }
409 }
410
411 /**
412 * Move tracked text in a given page
413 */
414 function doPage( $pageId ) {
415 $title = Title::newFromId( $pageId );
416 if ( $title ) {
417 $titleText = $title->getPrefixedText();
418 } else {
419 $titleText = '[deleted]';
420 }
421 $dbr = wfGetDB( DB_SLAVE );
422
423 // Finish any incomplete transactions
424 if ( !$this->copyOnly ) {
425 $this->finishIncompleteMoves( array( 'bt_page' => $pageId ) );
426 $this->syncDBs();
427 }
428
429 $startId = 0;
430 $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
431
432 while ( true ) {
433 $res = $dbr->select(
434 array( 'blob_tracking', 'text' ),
435 '*',
436 array(
437 'bt_page' => $pageId,
438 'bt_text_id > ' . $dbr->addQuotes( $startId ),
439 'bt_moved' => 0,
440 'bt_new_url IS NULL',
441 'bt_text_id=old_id',
442 ),
443 __METHOD__,
444 array(
445 'ORDER BY' => 'bt_text_id',
446 'LIMIT' => $this->batchSize
447 )
448 );
449 if ( !$res->numRows() ) {
450 break;
451 }
452
453 $lastTextId = 0;
454 foreach ( $res as $row ) {
455 if ( $lastTextId == $row->bt_text_id ) {
456 // Duplicate (null edit)
457 continue;
458 }
459 $lastTextId = $row->bt_text_id;
460 // Load the text
461 $text = Revision::getRevisionText( $row );
462 if ( $text === false ) {
463 $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
464 continue;
465 }
466
467 // Queue it
468 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
469 $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
470 $trx->commit();
471 $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
472 }
473 }
474 $startId = $row->bt_text_id;
475 }
476
477 $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
478 $trx->commit();
479 }
480
481 /**
482 * Atomic move operation.
483 *
484 * Write the new URL to the text table and set the bt_moved flag.
485 *
486 * This is done in a single transaction to provide restartable behaviour
487 * without data loss.
488 *
489 * The transaction is kept short to reduce locking.
490 */
491 function moveTextRow( $textId, $url ) {
492 if ( $this->copyOnly ) {
493 $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" );
494 exit( 1 );
495 }
496 $dbw = wfGetDB( DB_MASTER );
497 $dbw->begin();
498 $dbw->update( 'text',
499 array( // set
500 'old_text' => $url,
501 'old_flags' => 'external,utf-8',
502 ),
503 array( // where
504 'old_id' => $textId
505 ),
506 __METHOD__
507 );
508 $dbw->update( 'blob_tracking',
509 array( 'bt_moved' => 1 ),
510 array( 'bt_text_id' => $textId ),
511 __METHOD__
512 );
513 $dbw->commit();
514 }
515
516 /**
517 * Moves are done in two phases: bt_new_url and then bt_moved.
518 * - bt_new_url indicates that the text has been copied to the new cluster.
519 * - bt_moved indicates that the text table has been updated.
520 *
521 * This function completes any moves that only have done bt_new_url. This
522 * can happen when the script is interrupted, or when --copy-only is used.
523 */
524 function finishIncompleteMoves( $conds ) {
525 $dbr = wfGetDB( DB_SLAVE );
526
527 $startId = 0;
528 $conds = array_merge( $conds, array(
529 'bt_moved' => 0,
530 'bt_new_url IS NOT NULL'
531 ));
532 while ( true ) {
533 $res = $dbr->select( 'blob_tracking',
534 '*',
535 array_merge( $conds, array( 'bt_text_id > ' . $dbr->addQuotes( $startId ) ) ),
536 __METHOD__,
537 array(
538 'ORDER BY' => 'bt_text_id',
539 'LIMIT' => $this->batchSize,
540 )
541 );
542 if ( !$res->numRows() ) {
543 break;
544 }
545 $this->debug( 'Incomplete: ' . $res->numRows() . ' rows' );
546 foreach ( $res as $row ) {
547 $this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
548 }
549 $startId = $row->bt_text_id;
550 }
551 }
552
553 /**
554 * Returns the name of the next target cluster
555 */
556 function getTargetCluster() {
557 $cluster = next( $this->destClusters );
558 if ( $cluster === false ) {
559 $cluster = reset( $this->destClusters );
560 }
561 return $cluster;
562 }
563
564 /**
565 * Gets a DB master connection for the given external cluster name
566 */
567 function getExtDB( $cluster ) {
568 $lb = wfGetLBFactory()->getExternalLB( $cluster );
569 return $lb->getConnection( DB_MASTER );
570 }
571
572 /**
573 * Move an orphan text_id to the new cluster
574 */
575 function doOrphanList( $textIds ) {
576 // Finish incomplete moves
577 if ( !$this->copyOnly ) {
578 $this->finishIncompleteMoves( array( 'bt_text_id' => $textIds ) );
579 $this->syncDBs();
580 }
581
582 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
583
584 $res = wfGetDB( DB_SLAVE )->select(
585 array( 'text', 'blob_tracking' ),
586 array( 'old_id', 'old_text', 'old_flags' ),
587 array(
588 'old_id' => $textIds,
589 'bt_text_id=old_id',
590 'bt_moved' => 0,
591 ),
592 __METHOD__,
593 array( 'DISTINCT' )
594 );
595
596 foreach ( $res as $row ) {
597 $text = Revision::getRevisionText( $row );
598 if ( $text === false ) {
599 $this->critical( "Error: cannot load revision text for old_id=$textId" );
600 continue;
601 }
602
603 if ( !$trx->addItem( $text, $row->old_id ) ) {
604 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
605 $trx->commit();
606 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
607 }
608 }
609 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
610 $trx->commit();
611 }
612 }
613
614 /**
615 * Class to represent a recompression operation for a single CGZ blob
616 */
617 class CgzCopyTransaction {
618 var $parent;
619 var $blobClass;
620 var $cgz;
621 var $referrers;
622
623 /**
624 * Create a transaction from a RecompressTracked object
625 */
626 function __construct( $parent, $blobClass ) {
627 $this->blobClass = $blobClass;
628 $this->cgz = false;
629 $this->texts = array();
630 $this->parent = $parent;
631 }
632
633 /**
634 * Add text.
635 * Returns false if it's ready to commit.
636 */
637 function addItem( $text, $textId ) {
638 if ( !$this->cgz ) {
639 $class = $this->blobClass;
640 $this->cgz = new $class;
641 }
642 $hash = $this->cgz->addItem( $text );
643 $this->referrers[$textId] = $hash;
644 $this->texts[$textId] = $text;
645 return $this->cgz->isHappy();
646 }
647
648 function getSize() {
649 return count( $this->texts );
650 }
651
652 /**
653 * Recompress text after some aberrant modification
654 */
655 function recompress() {
656 $class = $this->blobClass;
657 $this->cgz = new $class;
658 $this->referrers = array();
659 foreach ( $this->texts as $textId => $text ) {
660 $hash = $this->cgz->addItem( $text );
661 $this->referrers[$textId] = $hash;
662 }
663 }
664
665 /**
666 * Commit the blob.
667 * Does nothing if no text items have been added.
668 * May skip the move if --copy-only is set.
669 */
670 function commit() {
671 $originalCount = count( $this->texts );
672 if ( !$originalCount ) {
673 return;
674 }
675
676 // Check to see if the target text_ids have been moved already.
677 //
678 // We originally read from the slave, so this can happen when a single
679 // text_id is shared between multiple pages. It's rare, but possible
680 // if a delete/move/undelete cycle splits up a null edit.
681 //
682 // We do a locking read to prevent closer-run race conditions.
683 $dbw = wfGetDB( DB_MASTER );
684 $dbw->begin();
685 $res = $dbw->select( 'blob_tracking',
686 array( 'bt_text_id', 'bt_moved' ),
687 array( 'bt_text_id' => array_keys( $this->referrers ) ),
688 __METHOD__, array( 'FOR UPDATE' ) );
689 $dirty = false;
690 foreach ( $res as $row ) {
691 if ( $row->bt_moved ) {
692 # This row has already been moved, remove it
693 $this->parent->debug( "TRX: conflict detected in old_id={$row->bt_text_id}" );
694 unset( $this->texts[$row->bt_text_id] );
695 $dirty = true;
696 }
697 }
698
699 // Recompress the blob if necessary
700 if ( $dirty ) {
701 if ( !count( $this->texts ) ) {
702 // All have been moved already
703 if ( $originalCount > 1 ) {
704 // This is suspcious, make noise
705 $this->critical( "Warning: concurrent operation detected, are there two conflicting " .
706 "processes running, doing the same job?" );
707 }
708 return;
709 }
710 $this->recompress();
711 }
712
713 // Insert the data into the destination cluster
714 $targetCluster = $this->parent->getTargetCluster();
715 $store = $this->parent->store;
716 $targetDB = $store->getMaster( $targetCluster );
717 $targetDB->clearFlag( DBO_TRX ); // we manage the transactions
718 $targetDB->begin();
719 $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) );
720
721 // Write the new URLs to the blob_tracking table
722 foreach ( $this->referrers as $textId => $hash ) {
723 $url = $baseUrl . '/' . $hash;
724 $dbw->update( 'blob_tracking',
725 array( 'bt_new_url' => $url ),
726 array(
727 'bt_text_id' => $textId,
728 'bt_moved' => 0, # Check for concurrent conflicting update
729 ),
730 __METHOD__
731 );
732 }
733
734 $targetDB->commit();
735 // Critical section here: interruption at this point causes blob duplication
736 // Reversing the order of the commits would cause data loss instead
737 $dbw->commit();
738
739 // Write the new URLs to the text table and set the moved flag
740 if ( !$this->parent->copyOnly ) {
741 foreach ( $this->referrers as $textId => $hash ) {
742 $url = $baseUrl . '/' . $hash;
743 $this->parent->moveTextRow( $textId, $url );
744 }
745 }
746 }
747 }
748