X-Git-Url: https://git.heureux-cyclage.org/?p=lhc%2Fweb%2Fwiklou.git;a=blobdiff_plain;f=maintenance%2Fstorage%2FrecompressTracked.php;h=00680462c6e2add3e3e88ff96a2abf23eb69fbb0;hp=316d2d2319bdbbec9c464f5076cd3923e6067b7f;hb=72e141a5e33417a3f4bcb18cf8a862634bda9bee;hpb=f9bb20657c6776a7543fbf33ead1c1e6fbad33de diff --git a/maintenance/storage/recompressTracked.php b/maintenance/storage/recompressTracked.php index 316d2d2319..00680462c6 100644 --- a/maintenance/storage/recompressTracked.php +++ b/maintenance/storage/recompressTracked.php @@ -22,6 +22,7 @@ * @ingroup Maintenance ExternalStorage */ +use MediaWiki\Storage\SqlBlobStore; use Wikimedia\Rdbms\IMaintainableDatabase; use MediaWiki\Logger\LegacyLogger; use MediaWiki\MediaWikiServices; @@ -63,18 +64,20 @@ class RecompressTracked { public $numProcs = 1; public $numBatches = 0; public $pageBlobClass, $orphanBlobClass; - public $replicaPipes, $replicaProcs, $prevReplicaId; + public $childPipes, $childProcs, $prevChildId; public $copyOnly = false; public $isChild = false; - public $replicaId = false; + public $childId = false; public $noCount = false; public $debugLog, $infoLog, $criticalLog; /** @var ExternalStoreDB */ public $store; + /** @var SqlBlobStore */ + private $blobStore; private static $optionsWithArgs = [ 'procs', - 'replica-id', + 'child-id', 'debug-log', 'info-log', 'critical-log' @@ -85,7 +88,7 @@ class RecompressTracked { 'procs' => 'numProcs', 'copy-only' => 'copyOnly', 'child' => 'isChild', - 'replica-id' => 'replicaId', + 'child-id' => 'childId', 'debug-log' => 'debugLog', 'info-log' => 'infoLog', 'critical-log' => 'criticalLog', @@ -114,12 +117,16 @@ class RecompressTracked { $this->store = $esFactory->getStore( 'DB' ); if ( !$this->isChild ) { $GLOBALS['wgDebugLogPrefix'] = "RCT M: "; - } elseif ( $this->replicaId !== false ) { - $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->replicaId}: "; + } elseif ( $this->childId !== false ) { + $GLOBALS['wgDebugLogPrefix'] = "RCT {$this->childId}: "; } $this->pageBlobClass = function_exists( 'xdiff_string_bdiff' ) ? DiffHistoryBlob::class : ConcatenatedGzipHistoryBlob::class; $this->orphanBlobClass = ConcatenatedGzipHistoryBlob::class; + // @phan-suppress-next-line PhanAccessMethodInternal + $this->blobStore = MediaWikiServices::getInstance() + ->getBlobStoreFactory() + ->newSqlBlobStore(); } function debug( $msg ) { @@ -145,8 +152,8 @@ class RecompressTracked { function logToFile( $msg, $file ) { $header = '[' . date( 'd\TH:i:s' ) . '] ' . wfHostname() . ' ' . posix_getpid(); - if ( $this->replicaId !== false ) { - $header .= "({$this->replicaId})"; + if ( $this->childId !== false ) { + $header .= "({$this->childId})"; } $header .= ' ' . WikiMap::getCurrentWikiDbDomain()->getId(); LegacyLogger::emit( sprintf( "%-50s %s\n", $header, $msg ), $file ); @@ -184,10 +191,10 @@ class RecompressTracked { } $this->syncDBs(); - $this->startReplicaProcs(); + $this->startChildProcs(); $this->doAllPages(); $this->doAllOrphans(); - $this->killReplicaProcs(); + $this->killChildProcs(); } /** @@ -217,12 +224,12 @@ class RecompressTracked { * This necessary because text recompression is slow: loading, compressing and * writing are all slow. */ - function startReplicaProcs() { + function startChildProcs() { $wiki = WikiMap::getWikiIdFromDbDomain( WikiMap::getCurrentWikiDbDomain() ); $cmd = 'php ' . Shell::escape( __FILE__ ); foreach ( self::$cmdLineOptionMap as $cmdOption => $classOption ) { - if ( $cmdOption == 'replica-id' ) { + if ( $cmdOption == 'child-id' ) { continue; } elseif ( in_array( $cmdOption, self::$optionsWithArgs ) && isset( $this->$classOption ) ) { $cmd .= " --$cmdOption " . Shell::escape( $this->$classOption ); @@ -234,7 +241,7 @@ class RecompressTracked { ' --wiki ' . Shell::escape( $wiki ) . ' ' . Shell::escape( ...$this->destClusters ); - $this->replicaPipes = $this->replicaProcs = []; + $this->childPipes = $this->childProcs = []; for ( $i = 0; $i < $this->numProcs; $i++ ) { $pipes = []; $spec = [ @@ -243,28 +250,28 @@ class RecompressTracked { [ 'file', 'php://stderr', 'w' ] ]; Wikimedia\suppressWarnings(); - $proc = proc_open( "$cmd --replica-id $i", $spec, $pipes ); + $proc = proc_open( "$cmd --child-id $i", $spec, $pipes ); Wikimedia\restoreWarnings(); if ( !$proc ) { - $this->critical( "Error opening replica DB process: $cmd" ); + $this->critical( "Error opening child process: $cmd" ); exit( 1 ); } - $this->replicaProcs[$i] = $proc; - $this->replicaPipes[$i] = $pipes[0]; + $this->childProcs[$i] = $proc; + $this->childPipes[$i] = $pipes[0]; } - $this->prevReplicaId = -1; + $this->prevChildId = -1; } /** * Gracefully terminate the child processes */ - function killReplicaProcs() { - $this->info( "Waiting for replica DB processes to finish..." ); + function killChildProcs() { + $this->info( "Waiting for child processes to finish..." ); for ( $i = 0; $i < $this->numProcs; $i++ ) { - $this->dispatchToReplica( $i, 'quit' ); + $this->dispatchToChild( $i, 'quit' ); } for ( $i = 0; $i < $this->numProcs; $i++ ) { - $status = proc_close( $this->replicaProcs[$i] ); + $status = proc_close( $this->childProcs[$i] ); if ( $status ) { $this->critical( "Warning: child #$i exited with status $status" ); } @@ -273,24 +280,24 @@ class RecompressTracked { } /** - * Dispatch a command to the next available replica DB. - * This may block until a replica DB finishes its work and becomes available. - * @param array ...$args + * Dispatch a command to the next available child process. + * This may block until a child process finishes its work and becomes available. + * @param array|string ...$args */ function dispatch( ...$args ) { - $pipes = $this->replicaPipes; + $pipes = $this->childPipes; $x = []; $y = []; $numPipes = stream_select( $x, $pipes, $y, 3600 ); if ( !$numPipes ) { - $this->critical( "Error waiting to write to replica DBs. Aborting" ); + $this->critical( "Error waiting to write to child process. Aborting" ); exit( 1 ); } for ( $i = 0; $i < $this->numProcs; $i++ ) { - $replicaId = ( $i + $this->prevReplicaId + 1 ) % $this->numProcs; - if ( isset( $pipes[$replicaId] ) ) { - $this->prevReplicaId = $replicaId; - $this->dispatchToReplica( $replicaId, $args ); + $childId = ( $i + $this->prevChildId + 1 ) % $this->numProcs; + if ( isset( $pipes[$childId] ) ) { + $this->prevChildId = $childId; + $this->dispatchToChild( $childId, $args ); return; } @@ -300,14 +307,14 @@ class RecompressTracked { } /** - * Dispatch a command to a specified replica DB - * @param int $replicaId + * Dispatch a command to a specified child process + * @param int $childId * @param array|string $args */ - function dispatchToReplica( $replicaId, $args ) { + function dispatchToChild( $childId, $args ) { $args = (array)$args; $cmd = implode( ' ', $args ); - fwrite( $this->replicaPipes[$replicaId], "$cmd\n" ); + fwrite( $this->childPipes[$childId], "$cmd\n" ); } /** @@ -531,7 +538,7 @@ class RecompressTracked { } $lastTextId = $row->bt_text_id; // Load the text - $text = Revision::getRevisionText( $row ); + $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags ); if ( $text === false ) { $this->critical( "Error loading {$row->bt_rev_id}/{$row->bt_text_id}" ); continue; @@ -685,7 +692,7 @@ class RecompressTracked { ); foreach ( $res as $row ) { - $text = Revision::getRevisionText( $row ); + $text = $this->blobStore->expandBlob( $row->old_text, $row->old_flags ); if ( $text === false ) { $this->critical( "Error: cannot load revision text for old_id={$row->old_id}" ); continue; @@ -713,6 +720,8 @@ class CgzCopyTransaction { /** @var ConcatenatedGzipHistoryBlob|false */ public $cgz; public $referrers; + /** @var array */ + private $texts; /** * Create a transaction from a RecompressTracked object