use Wikimedia\Rdbms\Database;
use Wikimedia\Rdbms\DBConnRef;
use Wikimedia\Rdbms\IDatabase;
-use Wikimedia\Rdbms\LoadBalancer;
+use Wikimedia\Rdbms\ILoadBalancer;
/**
* Service for looking up page revisions.
private $contentHandlerUseDB = true;
/**
- * @var LoadBalancer
+ * @var ILoadBalancer
*/
private $loadBalancer;
/**
* @todo $blobStore should be allowed to be any BlobStore!
*
- * @param LoadBalancer $loadBalancer
+ * @param ILoadBalancer $loadBalancer
* @param SqlBlobStore $blobStore
- * @param WANObjectCache $cache
+ * @param WANObjectCache $cache A cache for caching revision rows. This can be the local
+ * wiki's default instance even if $wikiId refers to a different wiki, since
+ * makeGlobalKey() is used to constructed a key that allows cached revision rows from
+ * the same database to be re-used between wikis. For example, enwiki and frwiki will
+ * use the same cache keys for revision rows from the wikidatawiki database, regardless
+ * of the cache's default key space.
* @param CommentStore $commentStore
* @param NameTableStore $contentModelStore
* @param NameTableStore $slotRoleStore
* @throws MWException if $mcrMigrationStage or $wikiId is invalid.
*/
public function __construct(
- LoadBalancer $loadBalancer,
+ ILoadBalancer $loadBalancer,
SqlBlobStore $blobStore,
WANObjectCache $cache,
CommentStore $commentStore,
return ( $this->mcrMigrationStage & $flags ) === $flags;
}
+ /**
+ * Throws a RevisionAccessException if this RevisionStore is configured for cross-wiki loading
+ * and still reading from the old DB schema.
+ *
+ * @throws RevisionAccessException
+ */
+ private function assertCrossWikiContentLoadingIsSafe() {
+ if ( $this->wikiId !== false && $this->hasMcrSchemaFlags( SCHEMA_COMPAT_READ_OLD ) ) {
+ throw new RevisionAccessException(
+ "Cross-wiki content loading is not supported by the pre-MCR schema"
+ );
+ }
+ }
+
public function setLogger( LoggerInterface $logger ) {
$this->logger = $logger;
}
}
/**
- * @return LoadBalancer
+ * @return ILoadBalancer
*/
private function getDBLoadBalancer() {
return $this->loadBalancer;
return $lb->getConnection( $mode, [], $this->wikiId );
}
+ /**
+ * @param int $queryFlags a bit field composed of READ_XXX flags
+ *
+ * @return DBConnRef
+ */
+ private function getDBConnectionRefForQueryFlags( $queryFlags ) {
+ list( $mode, ) = DBAccessObjectUtils::getDBOptions( $queryFlags );
+ return $this->getDBConnectionRef( $mode );
+ }
+
/**
* @param IDatabase $connection
*/
$this->failOnNull( $user->getId(), 'user field' );
$this->failOnEmpty( $user->getName(), 'user_text field' );
+ if ( !$rev->isReadyForInsertion() ) {
+ // This is here for future-proofing. At the time this check being added, it
+ // was redundant to the individual checks above.
+ throw new IncompleteRevisionException( 'Revision is incomplete' );
+ }
+
// TODO: we shouldn't need an actual Title here.
$title = Title::newFromLinkTarget( $rev->getPageAsLinkTarget() );
$pageId = $this->failOnEmpty( $rev->getPageId(), 'rev_page field' ); // check this early
$slot = $rev->getSlot( $role, RevisionRecord::RAW );
Assert::postcondition(
$slot->getContent() !== null,
- $role . ' slot must have content'
+ $role . ' slot must have content'
);
Assert::postcondition(
$slot->hasRevision(),
- $role . ' slot must have a revision associated'
+ $role . ' slot must have a revision associated'
);
}
foreach ( $slotRoles as $role ) {
$slot = $rev->getSlot( $role, RevisionRecord::RAW );
- if ( $slot->hasRevision() ) {
- // If the SlotRecord already has a revision ID set, this means it already exists
- // in the database, and should already belong to the current revision.
+ // If the SlotRecord already has a revision ID set, this means it already exists
+ // in the database, and should already belong to the current revision.
+ // However, a slot may already have a revision, but no content ID, if the slot
+ // is emulated based on the archive table, because we are in SCHEMA_COMPAT_READ_OLD
+ // mode, and the respective archive row was not yet migrated to the new schema.
+ // In that case, a new slot row (and content row) must be inserted even during
+ // undeletion.
+ if ( $slot->hasRevision() && $slot->hasContentId() ) {
// TODO: properly abort transaction if the assertion fails!
Assert::parameter(
$slot->getRevision() === $revisionId,
* @param IDatabase $dbw
* @param int $revisionId
* @param string &$blobAddress (may change!)
+ *
+ * @return int the text row id
*/
private function updateRevisionTextId( IDatabase $dbw, $revisionId, &$blobAddress ) {
$textId = $this->blobStore->getTextIdFromAddress( $blobAddress );
[ 'rev_id' => $revisionId ],
__METHOD__
);
+
+ return $textId;
}
/**
$blobAddress = $this->storeContentBlob( $protoSlot, $title, $blobHints );
}
+ $contentId = null;
+
// Write the main slot's text ID to the revision table for backwards compatibility
if ( $protoSlot->getRole() === 'main'
&& $this->hasMcrSchemaFlags( SCHEMA_COMPAT_WRITE_OLD )
) {
- $this->updateRevisionTextId( $dbw, $revisionId, $blobAddress );
+ // If SCHEMA_COMPAT_WRITE_NEW is also set, the fake content ID is overwritten
+ // with the real content ID below.
+ $textId = $this->updateRevisionTextId( $dbw, $revisionId, $blobAddress );
+ $contentId = $this->emulateContentId( $textId );
}
if ( $this->hasMcrSchemaFlags( SCHEMA_COMPAT_WRITE_NEW ) ) {
}
$this->insertSlotRowOn( $protoSlot, $dbw, $revisionId, $contentId );
- } else {
- $contentId = null;
}
$savedSlot = SlotRecord::newSaved(
if ( !isset( $revisionRow['rev_id'] ) ) {
// only if auto-increment was used
$revisionRow['rev_id'] = intval( $dbw->insertId() );
+
+ if ( $dbw->getType() === 'mysql' ) {
+ // (T202032) MySQL until 8.0 and MariaDB until some version after 10.1.34 don't save the
+ // auto-increment value to disk, so on server restart it might reuse IDs from deleted
+ // revisions. We can fix that with an insert with an explicit rev_id value, if necessary.
+
+ $maxRevId = intval( $dbw->selectField( 'archive', 'MAX(ar_rev_id)', '', __METHOD__ ) );
+ $table = 'archive';
+ if ( $this->hasMcrSchemaFlags( SCHEMA_COMPAT_WRITE_NEW ) ) {
+ $maxRevId2 = intval( $dbw->selectField( 'slots', 'MAX(slot_revision_id)', '', __METHOD__ ) );
+ if ( $maxRevId2 >= $maxRevId ) {
+ $maxRevId = $maxRevId2;
+ $table = 'slots';
+ }
+ }
+
+ if ( $maxRevId >= $revisionRow['rev_id'] ) {
+ $this->logger->debug(
+ '__METHOD__: Inserted revision {revid} but {table} has revisions up to {maxrevid}.'
+ . ' Trying to fix it.',
+ [
+ 'revid' => $revisionRow['rev_id'],
+ 'table' => $table,
+ 'maxrevid' => $maxRevId,
+ ]
+ );
+
+ if ( !$dbw->lock( 'fix-for-T202032', __METHOD__ ) ) {
+ throw new MWException( 'Failed to get database lock for T202032' );
+ }
+ $fname = __METHOD__;
+ $dbw->onTransactionResolution( function ( $trigger, $dbw ) use ( $fname ) {
+ $dbw->unlock( 'fix-for-T202032', $fname );
+ } );
+
+ $dbw->delete( 'revision', [ 'rev_id' => $revisionRow['rev_id'] ], __METHOD__ );
+
+ // The locking here is mostly to make MySQL bypass the REPEATABLE-READ transaction
+ // isolation (weird MySQL "feature"). It does seem to block concurrent auto-incrementing
+ // inserts too, though, at least on MariaDB 10.1.29.
+ //
+ // Don't try to lock `revision` in this way, it'll deadlock if there are concurrent
+ // transactions in this code path thanks to the row lock from the original ->insert() above.
+ //
+ // And we have to use raw SQL to bypass the "aggregation used with a locking SELECT" warning
+ // that's for non-MySQL DBs.
+ $row1 = $dbw->query(
+ $dbw->selectSqlText( 'archive', [ 'v' => "MAX(ar_rev_id)" ], '', __METHOD__ ) . ' FOR UPDATE'
+ )->fetchObject();
+ if ( $this->hasMcrSchemaFlags( SCHEMA_COMPAT_WRITE_NEW ) ) {
+ $row2 = $dbw->query(
+ $dbw->selectSqlText( 'slots', [ 'v' => "MAX(slot_revision_id)" ], '', __METHOD__ )
+ . ' FOR UPDATE'
+ )->fetchObject();
+ } else {
+ $row2 = null;
+ }
+ $maxRevId = max(
+ $maxRevId,
+ $row1 ? intval( $row1->v ) : 0,
+ $row2 ? intval( $row2->v ) : 0
+ );
+
+ // If we don't have SCHEMA_COMPAT_WRITE_NEW, all except the first of any concurrent
+ // transactions will throw a duplicate key error here. It doesn't seem worth trying
+ // to avoid that.
+ $revisionRow['rev_id'] = $maxRevId + 1;
+ $dbw->insert( 'revision', $revisionRow, __METHOD__ );
+ }
+ }
}
$commentCallback( $revisionRow['rev_id'] );
// MCR migration note: rev_content_model and rev_content_format will go away
if ( $this->contentHandlerUseDB ) {
+ $this->assertCrossWikiContentLoadingIsSafe();
+
$defaultModel = ContentHandler::getDefaultModelFor( $title );
$defaultFormat = ContentHandler::getForModelID( $defaultModel )->getDefaultFormat();
// if $wgContentHandlerUseDB is not set,
// all revisions must use the default content model and format.
+ $this->assertCrossWikiContentLoadingIsSafe();
+
$defaultModel = ContentHandler::getDefaultModelFor( $title );
$defaultHandler = ContentHandler::getForModelID( $defaultModel );
$defaultFormat = $defaultHandler->getDefaultFormat();
* Such revisions can for instance identify page rename
* operations and other such meta-modifications.
*
- * @note: This method grabs a FOR UPDATE lock on the relevant row of the page table,
+ * @note This method grabs a FOR UPDATE lock on the relevant row of the page table,
* to prevent a new revision from being inserted before the null revision has been written
* to the database.
*
return null;
}
- // Fetch the actual revision row, without locking all extra tables.
- $oldRevision = $this->loadRevisionFromId( $dbw, $pageLatest );
+ // Fetch the actual revision row from master, without locking all extra tables.
+ $oldRevision = $this->loadRevisionFromConds(
+ $dbw,
+ [ 'rev_id' => intval( $pageLatest ) ],
+ self::READ_LATEST,
+ $title
+ );
// Construct the new revision
$timestamp = wfTimestampNow(); // TODO: use a callback, so we can override it for testing.
* @return null|RecentChange
*/
public function getRecentChange( RevisionRecord $rev, $flags = 0 ) {
- $dbr = $this->getDBConnection( DB_REPLICA );
-
list( $dbType, ) = DBAccessObjectUtils::getDBOptions( $flags );
+ $db = $this->getDBConnection( $dbType );
$userIdentity = $rev->getUser( RevisionRecord::RAW );
}
// TODO: Select by rc_this_oldid alone - but as of Nov 2017, there is no index on that!
- $actorWhere = $this->actorMigration->getWhere( $dbr, 'rc_user', $rev->getUser(), false );
+ $actorWhere = $this->actorMigration->getWhere( $db, 'rc_user', $rev->getUser(), false );
$rc = RecentChange::newFromConds(
[
$actorWhere['conds'],
- 'rc_timestamp' => $dbr->timestamp( $rev->getTimestamp() ),
+ 'rc_timestamp' => $db->timestamp( $rev->getTimestamp() ),
'rc_this_oldid' => $rev->getId()
],
__METHOD__,
$dbType
);
- $this->releaseDBConnection( $dbr );
+ $this->releaseDBConnection( $db );
// XXX: cache this locally? Glue it to the RevisionRecord?
return $rc;
$mainSlotRow->role_name = 'main';
$mainSlotRow->model_name = null;
$mainSlotRow->slot_revision_id = null;
+ $mainSlotRow->slot_content_id = null;
$mainSlotRow->content_address = null;
$content = null;
if ( !property_exists( $row, 'old_flags' ) ) {
throw new InvalidArgumentException( 'old_flags was not set in $row' );
}
- $blobFlags = ( $row->old_flags === null ) ? '' : $row->old_flags;
+ $blobFlags = $row->old_flags ?? '';
}
$mainSlotRow->slot_revision_id = intval( $row->rev_id );
$mainSlotRow->format_name = isset( $row->rev_content_format )
? strval( $row->rev_content_format )
: null;
+
+ if ( isset( $row->rev_text_id ) && intval( $row->rev_text_id ) > 0 ) {
+ // Overwritten below for SCHEMA_COMPAT_WRITE_NEW
+ $mainSlotRow->slot_content_id
+ = $this->emulateContentId( intval( $row->rev_text_id ) );
+ }
} elseif ( is_array( $row ) ) {
$mainSlotRow->slot_revision_id = isset( $row['id'] ) ? intval( $row['id'] ) : null;
$mainSlotRow->format_name = $handler->getDefaultFormat();
}
}
+
+ if ( isset( $row['text_id'] ) && intval( $row['text_id'] ) > 0 ) {
+ // Overwritten below for SCHEMA_COMPAT_WRITE_NEW
+ $mainSlotRow->slot_content_id
+ = $this->emulateContentId( intval( $row['text_id'] ) );
+ }
} else {
throw new MWException( 'Revision constructor passed invalid row format.' );
}
if ( $mainSlotRow->model_name === null ) {
$mainSlotRow->model_name = function ( SlotRecord $slot ) use ( $title ) {
+ $this->assertCrossWikiContentLoadingIsSafe();
+
// TODO: MCR: consider slot role in getDefaultModelFor()! Use LinkTarget!
// TODO: MCR: deprecate $title->getModel().
return ContentHandler::getDefaultModelFor( $title );
};
}
- // NOTE: this callback will be looped through RevisionSlot::newInherited(), allowing
- // the inherited slot to have the same content_id as the original slot. In that case,
- // $slot will be the inherited slot, while $mainSlotRow still refers to the original slot.
- $mainSlotRow->slot_content_id =
- function ( SlotRecord $slot ) use ( $queryFlags, $mainSlotRow ) {
- list( $dbMode, ) = DBAccessObjectUtils::getDBOptions( $queryFlags );
- $db = $this->getDBConnectionRef( $dbMode );
- return $this->findSlotContentId( $db, $mainSlotRow->slot_revision_id, 'main' );
- };
+ if ( $this->hasMcrSchemaFlags( SCHEMA_COMPAT_WRITE_NEW ) ) {
+ // NOTE: this callback will be looped through RevisionSlot::newInherited(), allowing
+ // the inherited slot to have the same content_id as the original slot. In that case,
+ // $slot will be the inherited slot, while $mainSlotRow still refers to the original slot.
+ $mainSlotRow->slot_content_id =
+ function ( SlotRecord $slot ) use ( $queryFlags, $mainSlotRow ) {
+ $db = $this->getDBConnectionRefForQueryFlags( $queryFlags );
+ return $this->findSlotContentId( $db, $mainSlotRow->slot_revision_id, 'main' );
+ };
+ }
return new SlotRecord( $mainSlotRow, $content );
}
+ /**
+ * Provides a content ID to use with emulated SlotRecords in SCHEMA_COMPAT_OLD mode,
+ * based on the revision's text ID (rev_text_id or ar_text_id, respectively).
+ * Note that in SCHEMA_COMPAT_WRITE_BOTH, a callback to findSlotContentId() should be used
+ * instead, since in that mode, some revision rows may already have a real content ID,
+ * while other's don't - and for the ones that don't, we should indicate that it
+ * is missing and cause SlotRecords::hasContentId() to return false.
+ *
+ * @param int $textId
+ * @return int The emulated content ID
+ */
+ private function emulateContentId( $textId ) {
+ // Return a negative number to ensure the ID is distinct from any real content IDs
+ // that will be assigned in SCHEMA_COMPAT_WRITE_NEW mode and read in SCHEMA_COMPAT_READ_NEW
+ // mode.
+ return -$textId;
+ }
+
/**
* Loads a Content object based on a slot row.
*
// and fall back to master. The assumption is that we only want to force the fallback
// if we are quite sure the revision exists because the caller supplied a revision ID.
// If the page isn't found at all on a replica, it probably simply does not exist.
- $db = $this->getDBConnection( ( $flags & self::READ_LATEST ) ? DB_MASTER : DB_REPLICA );
+ $db = $this->getDBConnectionRefForQueryFlags( $flags );
$conds[] = 'rev_id=page_latest';
$rev = $this->loadRevisionFromConds( $db, $conds, $flags );
- $this->releaseDBConnection( $db );
return $rev;
}
}
// and fall back to master. The assumption is that we only want to force the fallback
// if we are quite sure the revision exists because the caller supplied a revision ID.
// If the page isn't found at all on a replica, it probably simply does not exist.
- $db = $this->getDBConnection( ( $flags & self::READ_LATEST ) ? DB_MASTER : DB_REPLICA );
+ $db = $this->getDBConnectionRefForQueryFlags( $flags );
$conds[] = 'rev_id=page_latest';
$rev = $this->loadRevisionFromConds( $db, $conds, $flags );
- $this->releaseDBConnection( $db );
return $rev;
}
}
$slots = [];
foreach ( $res as $row ) {
+ // resolve role names and model names from in-memory cache, instead of joining.
+ $row->role_name = $this->slotRoleStore->getName( (int)$row->slot_role_id );
+ $row->model_name = $this->contentModelStore->getName( (int)$row->content_model );
+
$contentCallback = function ( SlotRecord $slot ) use ( $queryFlags, $row ) {
return $this->loadSlotContent( $slot, null, null, null, $queryFlags );
};
$user = new UserIdentityValue( 0, '', 0 );
}
- $comment = $this->commentStore
- // Legacy because $row may have come from self::selectFields()
- ->getCommentLegacy( $this->getDBConnection( DB_REPLICA ), 'ar_comment', $row, true );
+ $db = $this->getDBConnectionRefForQueryFlags( $queryFlags );
+ // Legacy because $row may have come from self::selectFields()
+ $comment = $this->commentStore->getCommentLegacy( $db, 'ar_comment', $row, true );
$slots = $this->newRevisionSlots( $row->ar_rev_id, $row, $queryFlags, $title );
$user = new UserIdentityValue( 0, '', 0 );
}
- $comment = $this->commentStore
- // Legacy because $row may have come from self::selectFields()
- ->getCommentLegacy( $this->getDBConnection( DB_REPLICA ), 'rev_comment', $row, true );
+ $db = $this->getDBConnectionRefForQueryFlags( $queryFlags );
+ // Legacy because $row may have come from self::selectFields()
+ $comment = $this->commentStore->getCommentLegacy( $db, 'rev_comment', $row, true );
$slots = $this->newRevisionSlots( $row->rev_id, $row, $queryFlags, $title );
* @return RevisionRecord|null
*/
private function newRevisionFromConds( $conditions, $flags = 0, Title $title = null ) {
- $db = $this->getDBConnection( ( $flags & self::READ_LATEST ) ? DB_MASTER : DB_REPLICA );
+ $db = $this->getDBConnectionRefForQueryFlags( $flags );
$rev = $this->loadRevisionFromConds( $db, $conditions, $flags, $title );
- $this->releaseDBConnection( $db );
$lb = $this->getDBLoadBalancer();
&& $lb->hasOrMadeRecentMasterChanges()
) {
$flags = self::READ_LATEST;
- $db = $this->getDBConnection( DB_MASTER );
- $rev = $this->loadRevisionFromConds( $db, $conditions, $flags, $title );
- $this->releaseDBConnection( $db );
+ $dbw = $this->getDBConnection( DB_MASTER );
+ $rev = $this->loadRevisionFromConds( $dbw, $conditions, $flags, $title );
+ $this->releaseDBConnection( $dbw );
}
return $rev;
// NOTE: even when this class is set to not read from the old schema, callers
// should still be able to join against the text table, as long as we are still
// writing the old schema for compatibility.
- wfDeprecated( __METHOD__ . ' with `text` option', '1.32' );
+ // TODO: This should trigger a deprecation warning eventually (T200918), but not
+ // before all known usages are removed (see T198341 and T201164).
+ // wfDeprecated( __METHOD__ . ' with `text` option', '1.32' );
}
$ret['tables'][] = 'text';
*
* @param array $options Any combination of the following strings
* - 'content': Join with the content table, and select content meta-data fields
+ * - 'model': Join with the content_models table, and select the model_name field.
+ * Only applicable if 'content' is also set.
+ * - 'role': Join with the slot_roles table, and select the role_name field
*
* @return array With three keys:
* - tables: (string[]) to include in the `$table` to `IDatabase->select()`
}
} else {
$ret['tables'][] = 'slots';
- $ret['tables'][] = 'slot_roles';
$ret['fields'] = array_merge( $ret['fields'], [
'slot_revision_id',
'slot_content_id',
'slot_origin',
- 'role_name'
+ 'slot_role_id',
] );
- $ret['joins']['slot_roles'] = [ 'INNER JOIN', [ 'slot_role_id = role_id' ] ];
+
+ if ( in_array( 'role', $options, true ) ) {
+ // Use left join to attach role name, so we still find the revision row even
+ // if the role name is missing. This triggers a more obvious failure mode.
+ $ret['tables'][] = 'slot_roles';
+ $ret['joins']['slot_roles'] = [ 'LEFT JOIN', [ 'slot_role_id = role_id' ] ];
+ $ret['fields'][] = 'role_name';
+ }
if ( in_array( 'content', $options, true ) ) {
$ret['tables'][] = 'content';
- $ret['tables'][] = 'content_models';
$ret['fields'] = array_merge( $ret['fields'], [
'content_size',
'content_sha1',
'content_address',
- 'model_name'
+ 'content_model',
] );
$ret['joins']['content'] = [ 'INNER JOIN', [ 'slot_content_id = content_id' ] ];
- $ret['joins']['content_models'] = [ 'INNER JOIN', [ 'content_model = model_id' ] ];
+
+ if ( in_array( 'model', $options, true ) ) {
+ // Use left join to attach model name, so we still find the revision row even
+ // if the model name is missing. This triggers a more obvious failure mode.
+ $ret['tables'][] = 'content_models';
+ $ret['joins']['content_models'] = [ 'LEFT JOIN', [ 'content_model = model_id' ] ];
+ $ret['fields'][] = 'model_name';
+ }
+
}
}
* @return string|bool False if not found
*/
public function getTimestampFromId( $title, $id, $flags = 0 ) {
- $db = $this->getDBConnection(
- ( $flags & IDBAccessObject::READ_LATEST ) ? DB_MASTER : DB_REPLICA
- );
+ $db = $this->getDBConnectionRefForQueryFlags( $flags );
$conds = [ 'rev_id' => $id ];
$conds['rev_page'] = $title->getArticleID();
$timestamp = $db->selectField( 'revision', 'rev_timestamp', $conds, __METHOD__ );
- $this->releaseDBConnection( $db );
return ( $timestamp !== false ) ? wfTimestamp( TS_MW, $timestamp ) : false;
}