Batch links updates performed by refreshLinks jobs
authorAaron Schulz <aschulz@wikimedia.org>
Wed, 4 May 2016 22:05:03 +0000 (15:05 -0700)
committerKrinkle <krinklemail@gmail.com>
Fri, 13 May 2016 18:31:51 +0000 (18:31 +0000)
This should avoid erratic lag spikes that happen as many links are
added and removed via new pages (sometimes bot generated) and edits
that blank pages as well as their reversions.

In the common cases of a modest number of link changes, the entire
update will still happen in one transaction. In any case, link updates
now use a lock to avoid clobbering each other on the same page.

Bug: T109943
Change-Id: Icd453fcc3d28342065893260ad327eae11870245

includes/deferred/LinksDeletionUpdate.php
includes/deferred/LinksUpdate.php

index 1770639..65a8c0e 100644 (file)
@@ -49,6 +49,9 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate
        public function doUpdate() {
                # Page may already be deleted, so don't just getId()
                $id = $this->pageId;
+               // Make sure all links update threads see the changes of each other.
+               // This handles the case when updates have to batched into several COMMITs.
+               $scopedLock = LinksUpdate::acquirePageLock( $this->mDb, $id );
 
                # Delete restrictions for it
                $this->mDb->delete( 'page_restrictions', [ 'pr_page' => $id ], __METHOD__ );
@@ -101,6 +104,11 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate
                                $this->mDb->delete( 'recentchanges', [ 'rc_id' => $rcIds ], __METHOD__ );
                        }
                }
+
+               $this->mDb->onTransactionIdle( function() use ( &$scopedLock ) {
+                       // Release the lock *after* the final COMMIT for correctness
+                       ScopedCallback::consume( $scopedLock );
+               } );
        }
 
        public function getAsJobSpecification() {
index c0205be..ac08374 100644 (file)
  */
 
 /**
- * See docs/deferred.txt
+ * Class the manages updates of *_link tables as well as similar extension-managed tables
+ *
+ * @note: LinksUpdate is managed by DeferredUpdates::execute(). Do not run this in a transaction.
  *
- * @todo document (e.g. one-sentence top-level class description).
+ * See docs/deferred.txt
  */
 class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
        // @todo make members protected, but make sure extensions don't break
@@ -82,6 +84,8 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
         */
        private $user;
 
+       const BATCH_SIZE = 500; // try to keep typical updates in a single transaction
+
        /**
         * Constructor
         *
@@ -91,7 +95,8 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
         * @throws MWException
         */
        function __construct( Title $title, ParserOutput $parserOutput, $recursive = true ) {
-               parent::__construct( false ); // no implicit transaction
+               // Implicit transactions are disabled as they interfere with batching
+               parent::__construct( false );
 
                $this->mTitle = $title;
                $this->mId = $title->getArticleID( Title::GAID_FOR_UPDATE );
@@ -141,16 +146,46 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
 
        /**
         * Update link tables with outgoing links from an updated article
+        *
+        * @note: this is managed by DeferredUpdates::execute(). Do not run this in a transaction.
         */
        public function doUpdate() {
+               // Make sure all links update threads see the changes of each other.
+               // This handles the case when updates have to batched into several COMMITs.
+               $scopedLock = self::acquirePageLock( $this->mDb, $this->mId );
+
                Hooks::run( 'LinksUpdate', [ &$this ] );
                $this->doIncrementalUpdate();
 
-               $this->mDb->onTransactionIdle( function() {
+               $this->mDb->onTransactionIdle( function() use ( &$scopedLock ) {
                        Hooks::run( 'LinksUpdateComplete', [ &$this ] );
+                       // Release the lock *after* the final COMMIT for correctness
+                       ScopedCallback::consume( $scopedLock );
                } );
        }
 
+       /**
+        * Acquire a lock for performing link table updates for a page on a DB
+        *
+        * @param IDatabase $dbw
+        * @param integer $pageId
+        * @return ScopedCallback|null Returns null on failure
+        * @throws RuntimeException
+        * @since 1.27
+        */
+       public static function acquirePageLock( IDatabase $dbw, $pageId ) {
+               $scopedLock = $dbw->getScopedLockAndFlush(
+                       "LinksUpdate:pageid:$pageId",
+                       __METHOD__,
+                       15
+               );
+               if ( !$scopedLock ) {
+                       throw new RuntimeException( "Could not acquire lock on page #$pageId." );
+               }
+
+               return $scopedLock;
+       }
+
        protected function doIncrementalUpdate() {
                # Page links
                $existing = $this->getExistingLinks();
@@ -160,7 +195,6 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
 
                # Image links
                $existing = $this->getExistingImages();
-
                $imageDeletes = $this->getImageDeletions( $existing );
                $this->incrTableUpdate( 'imagelinks', 'il', $imageDeletes,
                        $this->getImageInsertions( $existing ) );
@@ -191,9 +225,7 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
 
                # Category links
                $existing = $this->getExistingCategories();
-
                $categoryDeletes = $this->getCategoryDeletions( $existing );
-
                $this->incrTableUpdate( 'categorylinks', 'cl', $categoryDeletes,
                        $this->getCategoryInsertions( $existing ) );
 
@@ -205,9 +237,7 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
 
                # Page properties
                $existing = $this->getExistingProperties();
-
                $propertiesDeletes = $this->getPropertyDeletions( $existing );
-
                $this->incrTableUpdate( 'page_props', 'pp', $propertiesDeletes,
                        $this->getPropertyInsertions( $existing ) );
 
@@ -307,44 +337,69 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
         * @param array $deletions
         * @param array $insertions Rows to insert
         */
-       function incrTableUpdate( $table, $prefix, $deletions, $insertions ) {
-               if ( $table == 'page_props' ) {
+       private function incrTableUpdate( $table, $prefix, $deletions, $insertions ) {
+               if ( $table === 'page_props' ) {
                        $fromField = 'pp_page';
                } else {
                        $fromField = "{$prefix}_from";
                }
-               $where = [ $fromField => $this->mId ];
-               if ( $table == 'pagelinks' || $table == 'templatelinks' || $table == 'iwlinks' ) {
-                       if ( $table == 'iwlinks' ) {
-                               $baseKey = 'iwl_prefix';
-                       } else {
-                               $baseKey = "{$prefix}_namespace";
+
+               $deleteWheres = []; // list of WHERE clause arrays for each DB delete() call
+               if ( $table === 'pagelinks' || $table === 'templatelinks' || $table === 'iwlinks' ) {
+                       $baseKey =  ( $table === 'iwlinks' ) ? 'iwl_prefix' : "{$prefix}_namespace";
+
+                       $curBatchSize = 0;
+                       $curDeletionBatch = [];
+                       $deletionBatches = [];
+                       foreach ( $deletions as $ns => $dbKeys ) {
+                               foreach ( $dbKeys as $dbKey => $unused ) {
+                                       $curDeletionBatch[$ns][$dbKey] = 1;
+                                       if ( ++$curBatchSize >= self::BATCH_SIZE ) {
+                                               $deletionBatches[] = $curDeletionBatch;
+                                               $curDeletionBatch = [];
+                                               $curBatchSize = 0;
+                                       }
+                               }
                        }
-                       $clause = $this->mDb->makeWhereFrom2d( $deletions, $baseKey, "{$prefix}_title" );
-                       if ( $clause ) {
-                               $where[] = $clause;
-                       } else {
-                               $where = false;
+                       if ( $curDeletionBatch ) {
+                               $deletionBatches[] = $curDeletionBatch;
+                       }
+
+                       foreach ( $deletionBatches as $deletionBatch ) {
+                               $deleteWheres[] = [
+                                       $fromField => $this->mId,
+                                       $this->mDb->makeWhereFrom2d( $deletionBatch, $baseKey, "{$prefix}_title" )
+                               ];
                        }
                } else {
-                       if ( $table == 'langlinks' ) {
+                       if ( $table === 'langlinks' ) {
                                $toField = 'll_lang';
-                       } elseif ( $table == 'page_props' ) {
+                       } elseif ( $table === 'page_props' ) {
                                $toField = 'pp_propname';
                        } else {
                                $toField = $prefix . '_to';
                        }
-                       if ( count( $deletions ) ) {
-                               $where[$toField] = array_keys( $deletions );
-                       } else {
-                               $where = false;
+
+                       $deletionBatches = array_chunk( array_keys( $deletions ), self::BATCH_SIZE );
+                       foreach ( $deletionBatches as $deletionBatch ) {
+                               $deleteWheres[] = [ $fromField => $this->mId, $toField => $deletionBatch ];
                        }
                }
-               if ( $where ) {
-                       $this->mDb->delete( $table, $where, __METHOD__ );
+
+               foreach ( $deleteWheres as $deleteWhere ) {
+                       $this->mDb->delete( $table, $deleteWhere, __METHOD__ );
+                       $this->mDb->commit( __METHOD__, 'flush' );
+                       wfGetLBFactory()->waitForReplication();
+               }
+
+               $insertBatches = array_chunk( $insertions, self::BATCH_SIZE );
+               foreach ( $insertBatches as $insertBatch ) {
+                       $this->mDb->insert( $table, $insertBatch, __METHOD__, 'IGNORE' );
+                       $this->mDb->commit( __METHOD__, 'flush' );
+                       wfGetLBFactory()->waitForReplication();
                }
+
                if ( count( $insertions ) ) {
-                       $this->mDb->insert( $table, $insertions, __METHOD__, 'IGNORE' );
                        Hooks::run( 'LinksUpdateAfterInsert', [ $this, $table, $insertions ] );
                }
        }