Merge "Add LinksUpdate::getRevision()"
[lhc/web/wiklou.git] / includes / deferred / LinksUpdate.php
index 4215ed0..07b5614 100644 (file)
  */
 
 /**
- * See docs/deferred.txt
+ * Class the manages updates of *_link tables as well as similar extension-managed tables
+ *
+ * @note: LinksUpdate is managed by DeferredUpdates::execute(). Do not run this in a transaction.
  *
- * @todo document (e.g. one-sentence top-level class description).
+ * See docs/deferred.txt
  */
 class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
        // @todo make members protected, but make sure extensions don't break
@@ -55,6 +57,9 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
        /** @var array Map of language codes to titles */
        public $mInterlangs;
 
+       /** @var array 2-D map of (prefix => DBK => 1) */
+       public $mInterwikis;
+
        /** @var array Map of arbitrary name to value */
        public $mProperties;
 
@@ -88,7 +93,8 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
         * @throws MWException
         */
        function __construct( Title $title, ParserOutput $parserOutput, $recursive = true ) {
-               parent::__construct( false ); // no implicit transaction
+               // Implicit transactions are disabled as they interfere with batching
+               parent::__construct( false );
 
                $this->mTitle = $title;
                $this->mId = $title->getArticleID( Title::GAID_FOR_UPDATE );
@@ -138,16 +144,46 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
 
        /**
         * Update link tables with outgoing links from an updated article
+        *
+        * @note: this is managed by DeferredUpdates::execute(). Do not run this in a transaction.
         */
        public function doUpdate() {
+               // Make sure all links update threads see the changes of each other.
+               // This handles the case when updates have to batched into several COMMITs.
+               $scopedLock = self::acquirePageLock( $this->mDb, $this->mId );
+
                Hooks::run( 'LinksUpdate', [ &$this ] );
                $this->doIncrementalUpdate();
 
-               $this->mDb->onTransactionIdle( function() {
+               $this->mDb->onTransactionIdle( function() use ( &$scopedLock ) {
                        Hooks::run( 'LinksUpdateComplete', [ &$this ] );
+                       // Release the lock *after* the final COMMIT for correctness
+                       ScopedCallback::consume( $scopedLock );
                } );
        }
 
+       /**
+        * Acquire a lock for performing link table updates for a page on a DB
+        *
+        * @param IDatabase $dbw
+        * @param integer $pageId
+        * @return ScopedCallback|null Returns null on failure
+        * @throws RuntimeException
+        * @since 1.27
+        */
+       public static function acquirePageLock( IDatabase $dbw, $pageId ) {
+               $scopedLock = $dbw->getScopedLockAndFlush(
+                       "LinksUpdate:pageid:$pageId",
+                       __METHOD__,
+                       15
+               );
+               if ( !$scopedLock ) {
+                       throw new RuntimeException( "Could not acquire lock on page #$pageId." );
+               }
+
+               return $scopedLock;
+       }
+
        protected function doIncrementalUpdate() {
                # Page links
                $existing = $this->getExistingLinks();
@@ -157,7 +193,6 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
 
                # Image links
                $existing = $this->getExistingImages();
-
                $imageDeletes = $this->getImageDeletions( $existing );
                $this->incrTableUpdate( 'imagelinks', 'il', $imageDeletes,
                        $this->getImageInsertions( $existing ) );
@@ -188,9 +223,7 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
 
                # Category links
                $existing = $this->getExistingCategories();
-
                $categoryDeletes = $this->getCategoryDeletions( $existing );
-
                $this->incrTableUpdate( 'categorylinks', 'cl', $categoryDeletes,
                        $this->getCategoryInsertions( $existing ) );
 
@@ -202,9 +235,7 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
 
                # Page properties
                $existing = $this->getExistingProperties();
-
                $propertiesDeletes = $this->getPropertyDeletions( $existing );
-
                $this->incrTableUpdate( 'page_props', 'pp', $propertiesDeletes,
                        $this->getPropertyInsertions( $existing ) );
 
@@ -304,44 +335,71 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
         * @param array $deletions
         * @param array $insertions Rows to insert
         */
-       function incrTableUpdate( $table, $prefix, $deletions, $insertions ) {
-               if ( $table == 'page_props' ) {
+       private function incrTableUpdate( $table, $prefix, $deletions, $insertions ) {
+               $bSize = RequestContext::getMain()->getConfig()->get( 'UpdateRowsPerQuery' );
+
+               if ( $table === 'page_props' ) {
                        $fromField = 'pp_page';
                } else {
                        $fromField = "{$prefix}_from";
                }
-               $where = [ $fromField => $this->mId ];
-               if ( $table == 'pagelinks' || $table == 'templatelinks' || $table == 'iwlinks' ) {
-                       if ( $table == 'iwlinks' ) {
-                               $baseKey = 'iwl_prefix';
-                       } else {
-                               $baseKey = "{$prefix}_namespace";
+
+               $deleteWheres = []; // list of WHERE clause arrays for each DB delete() call
+               if ( $table === 'pagelinks' || $table === 'templatelinks' || $table === 'iwlinks' ) {
+                       $baseKey =  ( $table === 'iwlinks' ) ? 'iwl_prefix' : "{$prefix}_namespace";
+
+                       $curBatchSize = 0;
+                       $curDeletionBatch = [];
+                       $deletionBatches = [];
+                       foreach ( $deletions as $ns => $dbKeys ) {
+                               foreach ( $dbKeys as $dbKey => $unused ) {
+                                       $curDeletionBatch[$ns][$dbKey] = 1;
+                                       if ( ++$curBatchSize >= $bSize ) {
+                                               $deletionBatches[] = $curDeletionBatch;
+                                               $curDeletionBatch = [];
+                                               $curBatchSize = 0;
+                                       }
+                               }
                        }
-                       $clause = $this->mDb->makeWhereFrom2d( $deletions, $baseKey, "{$prefix}_title" );
-                       if ( $clause ) {
-                               $where[] = $clause;
-                       } else {
-                               $where = false;
+                       if ( $curDeletionBatch ) {
+                               $deletionBatches[] = $curDeletionBatch;
+                       }
+
+                       foreach ( $deletionBatches as $deletionBatch ) {
+                               $deleteWheres[] = [
+                                       $fromField => $this->mId,
+                                       $this->mDb->makeWhereFrom2d( $deletionBatch, $baseKey, "{$prefix}_title" )
+                               ];
                        }
                } else {
-                       if ( $table == 'langlinks' ) {
+                       if ( $table === 'langlinks' ) {
                                $toField = 'll_lang';
-                       } elseif ( $table == 'page_props' ) {
+                       } elseif ( $table === 'page_props' ) {
                                $toField = 'pp_propname';
                        } else {
                                $toField = $prefix . '_to';
                        }
-                       if ( count( $deletions ) ) {
-                               $where[$toField] = array_keys( $deletions );
-                       } else {
-                               $where = false;
+
+                       $deletionBatches = array_chunk( array_keys( $deletions ), $bSize );
+                       foreach ( $deletionBatches as $deletionBatch ) {
+                               $deleteWheres[] = [ $fromField => $this->mId, $toField => $deletionBatch ];
                        }
                }
-               if ( $where ) {
-                       $this->mDb->delete( $table, $where, __METHOD__ );
+
+               foreach ( $deleteWheres as $deleteWhere ) {
+                       $this->mDb->delete( $table, $deleteWhere, __METHOD__ );
+                       $this->mDb->commit( __METHOD__, 'flush' );
+                       wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
                }
+
+               $insertBatches = array_chunk( $insertions, $bSize );
+               foreach ( $insertBatches as $insertBatch ) {
+                       $this->mDb->insert( $table, $insertBatch, __METHOD__, 'IGNORE' );
+                       $this->mDb->commit( __METHOD__, 'flush' );
+                       wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+               }
+
                if ( count( $insertions ) ) {
-                       $this->mDb->insert( $table, $insertions, __METHOD__, 'IGNORE' );
                        Hooks::run( 'LinksUpdateAfterInsert', [ $this, $table, $insertions ] );
                }
        }
@@ -875,6 +933,14 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
                $this->mRevision = $revision;
        }
 
+       /**
+        * @since 1.28
+        * @return null|Revision
+        */
+       public function getRevision() {
+               return $this->mRevision;
+       }
+
        /**
         * Set the User who triggered this LinksUpdate
         *