Merge "Fix param doc of ChangeTagsList::updateChangeTagsOnAll"
[lhc/web/wiklou.git] / maintenance / storage / recompressTracked.php
1 <?php
2 /**
3 * Moves blobs indexed by trackBlobs.php to a specified list of destination
4 * clusters, and recompresses them in the process.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License along
17 * with this program; if not, write to the Free Software Foundation, Inc.,
18 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
19 * http://www.gnu.org/copyleft/gpl.html
20 *
21 * @file
22 * @ingroup Maintenance ExternalStorage
23 */
24
25 use Wikimedia\Rdbms\IMaintainableDatabase;
26 use MediaWiki\Logger\LegacyLogger;
27 use MediaWiki\MediaWikiServices;
28 use MediaWiki\Shell\Shell;
29
30 $optionsWithArgs = RecompressTracked::getOptionsWithArgs();
31 require __DIR__ . '/../commandLine.inc';
32
33 if ( count( $args ) < 1 ) {
34 echo "Usage: php recompressTracked.php [options] <cluster> [... <cluster>...]
35 Moves blobs indexed by trackBlobs.php to a specified list of destination clusters,
36 and recompresses them in the process. Restartable.
37
38 Options:
39 --procs <procs> Set the number of child processes (default 1)
40 --copy-only Copy only, do not update the text table. Restart
41 without this option to complete.
42 --debug-log <file> Log debugging data to the specified file
43 --info-log <file> Log progress messages to the specified file
44 --critical-log <file> Log error messages to the specified file
45 ";
46 exit( 1 );
47 }
48
49 $job = RecompressTracked::newFromCommandLine( $args, $options );
50 $job->execute();
51
52 /**
53 * Maintenance script that moves blobs indexed by trackBlobs.php to a specified
54 * list of destination clusters, and recompresses them in the process.
55 *
56 * @ingroup Maintenance ExternalStorage
57 */
58 class RecompressTracked {
59 public $destClusters;
60 public $batchSize = 1000;
61 public $orphanBatchSize = 1000;
62 public $reportingInterval = 10;
63 public $numProcs = 1;
64 public $numBatches = 0;
65 public $pageBlobClass, $orphanBlobClass;
66 public $replicaPipes, $replicaProcs, $prevReplicaId;
67 public $copyOnly = false;
68 public $isChild = false;
69 public $replicaId = false;
70 public $noCount = false;
71 public $debugLog, $infoLog, $criticalLog;
72 /** @var ExternalStoreDB */
73 public $store;
74
75 private static $optionsWithArgs = [
76 'procs',
77 'replica-id',
78 'debug-log',
79 'info-log',
80 'critical-log'
81 ];
82
83 private static $cmdLineOptionMap = [
84 'no-count' => 'noCount',
85 'procs' => 'numProcs',
86 'copy-only' => 'copyOnly',
87 'child' => 'isChild',
88 'replica-id' => 'replicaId',
89 'debug-log' => 'debugLog',
90 'info-log' => 'infoLog',
91 'critical-log' => 'criticalLog',
92 ];
93
94 static function getOptionsWithArgs() {
95 return self::$optionsWithArgs;
96 }
97
98 static function newFromCommandLine( $args, $options ) {
99 $jobOptions = [ 'destClusters' => $args ];
100 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
101 if ( isset( $options[$cmdOption] ) ) {
102 $jobOptions[$classOption] = $options[$cmdOption];
103 }
104 }
105
106 return new self( $jobOptions );
107 }
108
109 function __construct( $options ) {
110 foreach ( $options as $name => $value ) {
111 $this->$name = $value;
112 }
113 $esFactory = MediaWikiServices::getInstance()->getExternalStoreFactory();
114 $this->store = $esFactory->getStore( 'DB' );
115 if ( !$this->isChild ) {
116 $GLOBALS['wgDebugLogPrefix'] = "RCT M: ";
117 } elseif ( $this->replicaId !== false ) {
118 $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->replicaId}: ";
119 }
120 $this->pageBlobClass = function_exists( 'xdiff_string_bdiff' ) ?
121 DiffHistoryBlob::class : ConcatenatedGzipHistoryBlob::class;
122 $this->orphanBlobClass = ConcatenatedGzipHistoryBlob::class;
123 }
124
125 function debug( $msg ) {
126 wfDebug( "$msg\n" );
127 if ( $this->debugLog ) {
128 $this->logToFile( $msg, $this->debugLog );
129 }
130 }
131
132 function info( $msg ) {
133 echo "$msg\n";
134 if ( $this->infoLog ) {
135 $this->logToFile( $msg, $this->infoLog );
136 }
137 }
138
139 function critical( $msg ) {
140 echo "$msg\n";
141 if ( $this->criticalLog ) {
142 $this->logToFile( $msg, $this->criticalLog );
143 }
144 }
145
146 function logToFile( $msg, $file ) {
147 $header = '[' . date( 'd\TH:i:s' ) . '] ' . wfHostname() . ' ' . posix_getpid();
148 if ( $this->replicaId !== false ) {
149 $header .= "({$this->replicaId})";
150 }
151 $header .= ' ' . WikiMap::getCurrentWikiDbDomain()->getId();
152 LegacyLogger::emit( sprintf( "%-50s %s\n", $header, $msg ), $file );
153 }
154
155 /**
156 * Wait until the selected replica DB has caught up to the master.
157 * This allows us to use the replica DB for things that were committed in a
158 * previous part of this batch process.
159 */
160 function syncDBs() {
161 $dbw = wfGetDB( DB_MASTER );
162 $dbr = wfGetDB( DB_REPLICA );
163 $pos = $dbw->getMasterPos();
164 $dbr->masterPosWait( $pos, 100000 );
165 }
166
167 /**
168 * Execute parent or child depending on the isChild option
169 */
170 function execute() {
171 if ( $this->isChild ) {
172 $this->executeChild();
173 } else {
174 $this->executeParent();
175 }
176 }
177
178 /**
179 * Execute the parent process
180 */
181 function executeParent() {
182 if ( !$this->checkTrackingTable() ) {
183 return;
184 }
185
186 $this->syncDBs();
187 $this->startReplicaProcs();
188 $this->doAllPages();
189 $this->doAllOrphans();
190 $this->killReplicaProcs();
191 }
192
193 /**
194 * Make sure the tracking table exists and isn't empty
195 * @return bool
196 */
197 function checkTrackingTable() {
198 $dbr = wfGetDB( DB_REPLICA );
199 if ( !$dbr->tableExists( 'blob_tracking' ) ) {
200 $this->critical( "Error: blob_tracking table does not exist" );
201
202 return false;
203 }
204 $row = $dbr->selectRow( 'blob_tracking', '*', '', __METHOD__ );
205 if ( !$row ) {
206 $this->info( "Warning: blob_tracking table contains no rows, skipping this wiki." );
207
208 return false;
209 }
210
211 return true;
212 }
213
214 /**
215 * Start the worker processes.
216 * These processes will listen on stdin for commands.
217 * This necessary because text recompression is slow: loading, compressing and
218 * writing are all slow.
219 */
220 function startReplicaProcs() {
221 $wiki = WikiMap::getWikiIdFromDbDomain( WikiMap::getCurrentWikiDbDomain() );
222
223 $cmd = 'php ' . Shell::escape( __FILE__ );
224 foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) {
225 if ( $cmdOption == 'replica-id' ) {
226 continue;
227 } elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) {
228 $cmd .= " --$cmdOption " . Shell::escape( $this->$classOption );
229 } elseif ( $this->$classOption ) {
230 $cmd .= " --$cmdOption";
231 }
232 }
233 $cmd .= ' --child' .
234 ' --wiki ' . Shell::escape( $wiki ) .
235 ' ' . Shell::escape( ...$this->destClusters );
236
237 $this->replicaPipes = $this->replicaProcs = [];
238 for ( $i = 0; $i < $this->numProcs; $i++ ) {
239 $pipes = [];
240 $spec = [
241 [ 'pipe', 'r' ],
242 [ 'file', 'php://stdout', 'w' ],
243 [ 'file', 'php://stderr', 'w' ]
244 ];
245 Wikimedia\suppressWarnings();
246 $proc = proc_open( "$cmd --replica-id $i", $spec, $pipes );
247 Wikimedia\restoreWarnings();
248 if ( !$proc ) {
249 $this->critical( "Error opening replica DB process: $cmd" );
250 exit( 1 );
251 }
252 $this->replicaProcs[$i] = $proc;
253 $this->replicaPipes[$i] = $pipes[0];
254 }
255 $this->prevReplicaId = -1;
256 }
257
258 /**
259 * Gracefully terminate the child processes
260 */
261 function killReplicaProcs() {
262 $this->info( "Waiting for replica DB processes to finish..." );
263 for ( $i = 0; $i < $this->numProcs; $i++ ) {
264 $this->dispatchToReplica( $i, 'quit' );
265 }
266 for ( $i = 0; $i < $this->numProcs; $i++ ) {
267 $status = proc_close( $this->replicaProcs[$i] );
268 if ( $status ) {
269 $this->critical( "Warning: child #$i exited with status $status" );
270 }
271 }
272 $this->info( "Done." );
273 }
274
275 /**
276 * Dispatch a command to the next available replica DB.
277 * This may block until a replica DB finishes its work and becomes available.
278 * @param array ...$args
279 */
280 function dispatch( ...$args ) {
281 $pipes = $this->replicaPipes;
282 $x = [];
283 $y = [];
284 $numPipes = stream_select( $x, $pipes, $y, 3600 );
285 if ( !$numPipes ) {
286 $this->critical( "Error waiting to write to replica DBs. Aborting" );
287 exit( 1 );
288 }
289 for ( $i = 0; $i < $this->numProcs; $i++ ) {
290 $replicaId = ( $i + $this->prevReplicaId + 1 ) % $this->numProcs;
291 if ( isset( $pipes[$replicaId] ) ) {
292 $this->prevReplicaId = $replicaId;
293 $this->dispatchToReplica( $replicaId, $args );
294
295 return;
296 }
297 }
298 $this->critical( "Unreachable" );
299 exit( 1 );
300 }
301
302 /**
303 * Dispatch a command to a specified replica DB
304 * @param int $replicaId
305 * @param array|string $args
306 */
307 function dispatchToReplica( $replicaId, $args ) {
308 $args = (array)$args;
309 $cmd = implode( ' ', $args );
310 fwrite( $this->replicaPipes[$replicaId], "$cmd\n" );
311 }
312
313 /**
314 * Move all tracked pages to the new clusters
315 */
316 function doAllPages() {
317 $dbr = wfGetDB( DB_REPLICA );
318 $i = 0;
319 $startId = 0;
320 if ( $this->noCount ) {
321 $numPages = '[unknown]';
322 } else {
323 $numPages = $dbr->selectField( 'blob_tracking',
324 'COUNT(DISTINCT bt_page)',
325 # A condition is required so that this query uses the index
326 [ 'bt_moved' => 0 ],
327 __METHOD__
328 );
329 }
330 if ( $this->copyOnly ) {
331 $this->info( "Copying pages..." );
332 } else {
333 $this->info( "Moving pages..." );
334 }
335 while ( true ) {
336 $res = $dbr->select( 'blob_tracking',
337 [ 'bt_page' ],
338 [
339 'bt_moved' => 0,
340 'bt_page > ' . $dbr->addQuotes( $startId )
341 ],
342 __METHOD__,
343 [
344 'DISTINCT',
345 'ORDER BY' => 'bt_page',
346 'LIMIT' => $this->batchSize,
347 ]
348 );
349 if ( !$res->numRows() ) {
350 break;
351 }
352 foreach ( $res as $row ) {
353 $startId = $row->bt_page;
354 $this->dispatch( 'doPage', $row->bt_page );
355 $i++;
356 }
357 $this->report( 'pages', $i, $numPages );
358 }
359 $this->report( 'pages', $i, $numPages );
360 if ( $this->copyOnly ) {
361 $this->info( "All page copies queued." );
362 } else {
363 $this->info( "All page moves queued." );
364 }
365 }
366
367 /**
368 * Display a progress report
369 * @param string $label
370 * @param int $current
371 * @param int $end
372 */
373 function report( $label, $current, $end ) {
374 $this->numBatches++;
375 if ( $current == $end || $this->numBatches >= $this->reportingInterval ) {
376 $this->numBatches = 0;
377 $this->info( "$label: $current / $end" );
378 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
379 }
380 }
381
382 /**
383 * Move all orphan text to the new clusters
384 */
385 function doAllOrphans() {
386 $dbr = wfGetDB( DB_REPLICA );
387 $startId = 0;
388 $i = 0;
389 if ( $this->noCount ) {
390 $numOrphans = '[unknown]';
391 } else {
392 $numOrphans = $dbr->selectField( 'blob_tracking',
393 'COUNT(DISTINCT bt_text_id)',
394 [ 'bt_moved' => 0, 'bt_page' => 0 ],
395 __METHOD__ );
396 if ( !$numOrphans ) {
397 return;
398 }
399 }
400 if ( $this->copyOnly ) {
401 $this->info( "Copying orphans..." );
402 } else {
403 $this->info( "Moving orphans..." );
404 }
405
406 while ( true ) {
407 $res = $dbr->select( 'blob_tracking',
408 [ 'bt_text_id' ],
409 [
410 'bt_moved' => 0,
411 'bt_page' => 0,
412 'bt_text_id > ' . $dbr->addQuotes( $startId )
413 ],
414 __METHOD__,
415 [
416 'DISTINCT',
417 'ORDER BY' => 'bt_text_id',
418 'LIMIT' => $this->batchSize
419 ]
420 );
421 if ( !$res->numRows() ) {
422 break;
423 }
424 $ids = [];
425 foreach ( $res as $row ) {
426 $startId = $row->bt_text_id;
427 $ids[] = $row->bt_text_id;
428 $i++;
429 }
430 // Need to send enough orphan IDs to the child at a time to fill a blob,
431 // so orphanBatchSize needs to be at least ~100.
432 // batchSize can be smaller or larger.
433 while ( count( $ids ) > $this->orphanBatchSize ) {
434 $args = array_slice( $ids, 0, $this->orphanBatchSize );
435 $ids = array_slice( $ids, $this->orphanBatchSize );
436 array_unshift( $args, 'doOrphanList' );
437 $this->dispatch( ...$args );
438 }
439 if ( count( $ids ) ) {
440 $args = $ids;
441 array_unshift( $args, 'doOrphanList' );
442 $this->dispatch( ...$args );
443 }
444
445 $this->report( 'orphans', $i, $numOrphans );
446 }
447 $this->report( 'orphans', $i, $numOrphans );
448 $this->info( "All orphans queued." );
449 }
450
451 /**
452 * Main entry point for worker processes
453 */
454 function executeChild() {
455 $this->debug( 'starting' );
456 $this->syncDBs();
457
458 while ( !feof( STDIN ) ) {
459 $line = rtrim( fgets( STDIN ) );
460 if ( $line == '' ) {
461 continue;
462 }
463 $this->debug( $line );
464 $args = explode( ' ', $line );
465 $cmd = array_shift( $args );
466 switch ( $cmd ) {
467 case 'doPage':
468 $this->doPage( intval( $args[0] ) );
469 break;
470 case 'doOrphanList':
471 $this->doOrphanList( array_map( 'intval', $args ) );
472 break;
473 case 'quit':
474 return;
475 }
476 MediaWikiServices::getInstance()->getDBLoadBalancerFactory()->waitForReplication();
477 }
478 }
479
480 /**
481 * Move tracked text in a given page
482 *
483 * @param int $pageId
484 */
485 function doPage( $pageId ) {
486 $title = Title::newFromID( $pageId );
487 if ( $title ) {
488 $titleText = $title->getPrefixedText();
489 } else {
490 $titleText = '[deleted]';
491 }
492 $dbr = wfGetDB( DB_REPLICA );
493
494 // Finish any incomplete transactions
495 if ( !$this->copyOnly ) {
496 $this->finishIncompleteMoves( [ 'bt_page' => $pageId ] );
497 $this->syncDBs();
498 }
499
500 $startId = 0;
501 $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
502
503 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
504 while ( true ) {
505 $res = $dbr->select(
506 [ 'blob_tracking', 'text' ],
507 '*',
508 [
509 'bt_page' => $pageId,
510 'bt_text_id > ' . $dbr->addQuotes( $startId ),
511 'bt_moved' => 0,
512 'bt_new_url IS NULL',
513 'bt_text_id=old_id',
514 ],
515 __METHOD__,
516 [
517 'ORDER BY' => 'bt_text_id',
518 'LIMIT' => $this->batchSize
519 ]
520 );
521 if ( !$res->numRows() ) {
522 break;
523 }
524
525 $lastTextId = 0;
526 foreach ( $res as $row ) {
527 $startId = $row->bt_text_id;
528 if ( $lastTextId == $row->bt_text_id ) {
529 // Duplicate (null edit)
530 continue;
531 }
532 $lastTextId = $row->bt_text_id;
533 // Load the text
534 $text = Revision::getRevisionText( $row );
535 if ( $text === false ) {
536 $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" );
537 continue;
538 }
539
540 // Queue it
541 if ( !$trx->addItem( $text, $row->bt_text_id ) ) {
542 $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
543 $trx->commit();
544 $trx = new CgzCopyTransaction( $this, $this->pageBlobClass );
545 $lbFactory->waitForReplication();
546 }
547 }
548 }
549
550 $this->debug( "$titleText: committing blob with " . $trx->getSize() . " items" );
551 $trx->commit();
552 }
553
554 /**
555 * Atomic move operation.
556 *
557 * Write the new URL to the text table and set the bt_moved flag.
558 *
559 * This is done in a single transaction to provide restartable behavior
560 * without data loss.
561 *
562 * The transaction is kept short to reduce locking.
563 *
564 * @param int $textId
565 * @param string $url
566 */
567 function moveTextRow( $textId, $url ) {
568 if ( $this->copyOnly ) {
569 $this->critical( "Internal error: can't call moveTextRow() in --copy-only mode" );
570 exit( 1 );
571 }
572 $dbw = wfGetDB( DB_MASTER );
573 $dbw->begin( __METHOD__ );
574 $dbw->update( 'text',
575 [ // set
576 'old_text' => $url,
577 'old_flags' => 'external,utf-8',
578 ],
579 [ // where
580 'old_id' => $textId
581 ],
582 __METHOD__
583 );
584 $dbw->update( 'blob_tracking',
585 [ 'bt_moved' => 1 ],
586 [ 'bt_text_id' => $textId ],
587 __METHOD__
588 );
589 $dbw->commit( __METHOD__ );
590 }
591
592 /**
593 * Moves are done in two phases: bt_new_url and then bt_moved.
594 * - bt_new_url indicates that the text has been copied to the new cluster.
595 * - bt_moved indicates that the text table has been updated.
596 *
597 * This function completes any moves that only have done bt_new_url. This
598 * can happen when the script is interrupted, or when --copy-only is used.
599 *
600 * @param array $conds
601 */
602 function finishIncompleteMoves( $conds ) {
603 $dbr = wfGetDB( DB_REPLICA );
604 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
605
606 $startId = 0;
607 $conds = array_merge( $conds, [
608 'bt_moved' => 0,
609 'bt_new_url IS NOT NULL'
610 ] );
611 while ( true ) {
612 $res = $dbr->select( 'blob_tracking',
613 '*',
614 array_merge( $conds, [ 'bt_text_id > ' . $dbr->addQuotes( $startId ) ] ),
615 __METHOD__,
616 [
617 'ORDER BY' => 'bt_text_id',
618 'LIMIT' => $this->batchSize,
619 ]
620 );
621 if ( !$res->numRows() ) {
622 break;
623 }
624 $this->debug( 'Incomplete: ' . $res->numRows() . ' rows' );
625 foreach ( $res as $row ) {
626 $startId = $row->bt_text_id;
627 $this->moveTextRow( $row->bt_text_id, $row->bt_new_url );
628 if ( $row->bt_text_id % 10 == 0 ) {
629 $lbFactory->waitForReplication();
630 }
631 }
632 }
633 }
634
635 /**
636 * Returns the name of the next target cluster
637 * @return string
638 */
639 function getTargetCluster() {
640 $cluster = next( $this->destClusters );
641 if ( $cluster === false ) {
642 $cluster = reset( $this->destClusters );
643 }
644
645 return $cluster;
646 }
647
648 /**
649 * Gets a DB master connection for the given external cluster name
650 * @param string $cluster
651 * @return IMaintainableDatabase
652 */
653 function getExtDB( $cluster ) {
654 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
655 $lb = $lbFactory->getExternalLB( $cluster );
656
657 return $lb->getMaintenanceConnectionRef( DB_MASTER );
658 }
659
660 /**
661 * Move an orphan text_id to the new cluster
662 *
663 * @param array $textIds
664 */
665 function doOrphanList( $textIds ) {
666 // Finish incomplete moves
667 if ( !$this->copyOnly ) {
668 $this->finishIncompleteMoves( [ 'bt_text_id' => $textIds ] );
669 $this->syncDBs();
670 }
671
672 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
673
674 $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
675 $res = wfGetDB( DB_REPLICA )->select(
676 [ 'text', 'blob_tracking' ],
677 [ 'old_id', 'old_text', 'old_flags' ],
678 [
679 'old_id' => $textIds,
680 'bt_text_id=old_id',
681 'bt_moved' => 0,
682 ],
683 __METHOD__,
684 [ 'DISTINCT' ]
685 );
686
687 foreach ( $res as $row ) {
688 $text = Revision::getRevisionText( $row );
689 if ( $text === false ) {
690 $this->critical( "Error: cannot load revision text for old_id={$row->old_id}" );
691 continue;
692 }
693
694 if ( !$trx->addItem( $text, $row->old_id ) ) {
695 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
696 $trx->commit();
697 $trx = new CgzCopyTransaction( $this, $this->orphanBlobClass );
698 $lbFactory->waitForReplication();
699 }
700 }
701 $this->debug( "[orphan]: committing blob with " . $trx->getSize() . " rows" );
702 $trx->commit();
703 }
704 }
705
706 /**
707 * Class to represent a recompression operation for a single CGZ blob
708 */
709 class CgzCopyTransaction {
710 /** @var RecompressTracked */
711 public $parent;
712 public $blobClass;
713 /** @var ConcatenatedGzipHistoryBlob */
714 public $cgz;
715 public $referrers;
716
717 /**
718 * Create a transaction from a RecompressTracked object
719 * @param RecompressTracked $parent
720 * @param string $blobClass
721 */
722 function __construct( $parent, $blobClass ) {
723 $this->blobClass = $blobClass;
724 $this->cgz = false;
725 $this->texts = [];
726 $this->parent = $parent;
727 }
728
729 /**
730 * Add text.
731 * Returns false if it's ready to commit.
732 * @param string $text
733 * @param int $textId
734 * @return bool
735 */
736 function addItem( $text, $textId ) {
737 if ( !$this->cgz ) {
738 $class = $this->blobClass;
739 $this->cgz = new $class;
740 }
741 $hash = $this->cgz->addItem( $text );
742 $this->referrers[$textId] = $hash;
743 $this->texts[$textId] = $text;
744
745 return $this->cgz->isHappy();
746 }
747
748 function getSize() {
749 return count( $this->texts );
750 }
751
752 /**
753 * Recompress text after some aberrant modification
754 */
755 function recompress() {
756 $class = $this->blobClass;
757 $this->cgz = new $class;
758 $this->referrers = [];
759 foreach ( $this->texts as $textId => $text ) {
760 $hash = $this->cgz->addItem( $text );
761 $this->referrers[$textId] = $hash;
762 }
763 }
764
765 /**
766 * Commit the blob.
767 * Does nothing if no text items have been added.
768 * May skip the move if --copy-only is set.
769 */
770 function commit() {
771 $originalCount = count( $this->texts );
772 if ( !$originalCount ) {
773 return;
774 }
775
776 /* Check to see if the target text_ids have been moved already.
777 *
778 * We originally read from the replica DB, so this can happen when a single
779 * text_id is shared between multiple pages. It's rare, but possible
780 * if a delete/move/undelete cycle splits up a null edit.
781 *
782 * We do a locking read to prevent closer-run race conditions.
783 */
784 $dbw = wfGetDB( DB_MASTER );
785 $dbw->begin( __METHOD__ );
786 $res = $dbw->select( 'blob_tracking',
787 [ 'bt_text_id', 'bt_moved' ],
788 [ 'bt_text_id' => array_keys( $this->referrers ) ],
789 __METHOD__, [ 'FOR UPDATE' ] );
790 $dirty = false;
791 foreach ( $res as $row ) {
792 if ( $row->bt_moved ) {
793 # This row has already been moved, remove it
794 $this->parent->debug( "TRX: conflict detected in old_id={$row->bt_text_id}" );
795 unset( $this->texts[$row->bt_text_id] );
796 $dirty = true;
797 }
798 }
799
800 // Recompress the blob if necessary
801 if ( $dirty ) {
802 if ( !count( $this->texts ) ) {
803 // All have been moved already
804 if ( $originalCount > 1 ) {
805 // This is suspcious, make noise
806 $this->parent->critical(
807 "Warning: concurrent operation detected, are there two conflicting " .
808 "processes running, doing the same job?" );
809 }
810
811 return;
812 }
813 $this->recompress();
814 }
815
816 // Insert the data into the destination cluster
817 $targetCluster = $this->parent->getTargetCluster();
818 $store = $this->parent->store;
819 $targetDB = $store->getMaster( $targetCluster );
820 $targetDB->clearFlag( DBO_TRX ); // we manage the transactions
821 $targetDB->begin( __METHOD__ );
822 $baseUrl = $this->parent->store->store( $targetCluster, serialize( $this->cgz ) );
823
824 // Write the new URLs to the blob_tracking table
825 foreach ( $this->referrers as $textId => $hash ) {
826 $url = $baseUrl . '/' . $hash;
827 $dbw->update( 'blob_tracking',
828 [ 'bt_new_url' => $url ],
829 [
830 'bt_text_id' => $textId,
831 'bt_moved' => 0, # Check for concurrent conflicting update
832 ],
833 __METHOD__
834 );
835 }
836
837 $targetDB->commit( __METHOD__ );
838 // Critical section here: interruption at this point causes blob duplication
839 // Reversing the order of the commits would cause data loss instead
840 $dbw->commit( __METHOD__ );
841
842 // Write the new URLs to the text table and set the moved flag
843 if ( !$this->parent->copyOnly ) {
844 foreach ( $this->referrers as $textId => $hash ) {
845 $url = $baseUrl . '/' . $hash;
846 $this->parent->moveTextRow( $textId, $url );
847 }
848 }
849 }
850 }