Add convenience commitAndWaitForReplication() method
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 13 Aug 2016 02:56:21 +0000 (19:56 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 16 Aug 2016 22:09:17 +0000 (22:09 +0000)
This also does sanity checks to avoid breaking transactions

Change-Id: I7453c245eee25a26243e606970ef5f79b21a8141

includes/WatchedItemStore.php
includes/db/loadbalancer/LBFactory.php
includes/deferred/DataUpdate.php
includes/deferred/LinksDeletionUpdate.php
includes/deferred/LinksUpdate.php
includes/jobqueue/jobs/CategoryMembershipChangeJob.php
includes/jobqueue/jobs/DeleteLinksJob.php
includes/jobqueue/jobs/HTMLCacheUpdateJob.php
includes/jobqueue/jobs/RecentChangesUpdateJob.php
includes/jobqueue/jobs/RefreshLinksJob.php

index cf18fab..a13609b 100644 (file)
@@ -742,6 +742,7 @@ class WatchedItemStore implements StatsdAwareInterface {
 
                                        $dbw = $this->getConnection( DB_MASTER );
                                        $factory = wfGetLBFactory();
+                                       $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
 
                                        $watchersChunks = array_chunk( $watchers, $wgUpdateRowsPerQuery );
                                        foreach ( $watchersChunks as $watchersChunk ) {
@@ -755,8 +756,9 @@ class WatchedItemStore implements StatsdAwareInterface {
                                                        ], $fname
                                                );
                                                if ( count( $watchersChunks ) > 1 ) {
-                                                       $factory->commitMasterChanges( __METHOD__ );
-                                                       $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
+                                                       $factory->commitAndWaitForReplication(
+                                                               __METHOD__, $ticket, [ 'wiki' => $dbw->getWikiID() ]
+                                                       );
                                                }
                                        }
                                        $this->uncacheLinkTarget( $target );
index 4078a39..efc6148 100644 (file)
@@ -42,6 +42,8 @@ abstract class LBFactory implements DestructibleService {
        /** @var WANObjectCache */
        protected $wanCache;
 
+       /** @var mixed */
+       protected $ticket;
        /** @var string|bool Reason all LBs are read-only or false if not */
        protected $readOnlyReason = false;
 
@@ -72,6 +74,7 @@ abstract class LBFactory implements DestructibleService {
                        $this->wanCache = WANObjectCache::newEmpty();
                }
                $this->trxLogger = LoggerFactory::getInstance( 'DBTransaction' );
+               $this->ticket = mt_rand();
        }
 
        /**
@@ -404,6 +407,44 @@ abstract class LBFactory implements DestructibleService {
                }
        }
 
+       /**
+        * Get a token asserting that no transaction writes are active
+        *
+        * @param string $fname Caller name (e.g. __METHOD__)
+        * @return mixed A value to pass to commitAndWaitForReplication()
+        * @since 1.28
+        */
+       public function getEmptyTransactionTicket( $fname ) {
+               if ( $this->hasMasterChanges() ) {
+                       $this->trxLogger->error( __METHOD__ . ": $fname does not have outer scope." );
+                       return null;
+               }
+
+               return $this->ticket;
+       }
+
+       /**
+        * Convenience method for safely running commitMasterChanges()/waitForReplication()
+        *
+        * This will commit and wait unless $ticket indicates it is unsafe to do so
+        *
+        * @param string $fname Caller name (e.g. __METHOD__)
+        * @param mixed $ticket Result of getOuterTransactionScopeTicket()
+        * @param array $opts Options to waitForReplication()
+        * @throws DBReplicationWaitError
+        * @since 1.28
+        */
+       public function commitAndWaitForReplication( $fname, $ticket, array $opts = [] ) {
+               if ( $ticket !== $this->ticket ) {
+                       $logger = LoggerFactory::getInstance( 'DBPerformance' );
+                       $logger->error( __METHOD__ . ": cannot commit; $fname does not have outer scope." );
+                       return;
+               }
+
+               $this->commitMasterChanges( $fname );
+               $this->waitForReplication( $opts );
+       }
+
        /**
         * Disable the ChronologyProtector for all load balancers
         *
index 2865461..5b84ca9 100644 (file)
  *       subclasses can override the beginTransaction() and commitTransaction() methods.
  */
 abstract class DataUpdate implements DeferrableUpdate {
+       /** @var mixed Result from LBFactory::getEmptyTransactionTicket() */
+       protected $ticket;
+
        public function __construct() {
                // noop
        }
 
+       /**
+        * @param mixed $ticket Result of getEmptyTransactionTicket()
+        * @since 1.28
+        */
+       public function setTransactionTicket( $ticket ) {
+               $this->ticket = $ticket;
+       }
+
        /**
         * Begin an appropriate transaction, if any.
         * This default implementation does nothing.
index f96df48..47f2b21 100644 (file)
@@ -78,8 +78,9 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate
                foreach ( $catBatches as $catBatch ) {
                        $this->page->updateCategoryCounts( [], $catBatch, $id );
                        if ( count( $catBatches ) > 1 ) {
-                               $factory->commitMasterChanges( __METHOD__ );
-                               $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+                               $factory->commitAndWaitForReplication(
+                                       __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ]
+                               );
                        }
                }
 
@@ -174,8 +175,9 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate
                        foreach ( $rcIdBatches as $rcIdBatch ) {
                                $this->mDb->delete( 'recentchanges', [ 'rc_id' => $rcIdBatch ], __METHOD__ );
                                if ( count( $rcIdBatches ) > 1 ) {
-                                       $factory->commitMasterChanges( __METHOD__ );
-                                       $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+                                       $factory->commitAndWaitForReplication(
+                                               __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ]
+                                       );
                                }
                        }
                }
@@ -194,8 +196,9 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate
                        $pkDeleteConds[] = $this->mDb->makeList( (array)$row, LIST_AND );
                        if ( count( $pkDeleteConds ) >= $bSize ) {
                                $dbw->delete( $table, $dbw->makeList( $pkDeleteConds, LIST_OR ), __METHOD__ );
-                               $factory->commitMasterChanges( __METHOD__ );
-                               $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
+                               $factory->commitAndWaitForReplication(
+                                       __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ]
+                               );
                                $pkDeleteConds = [];
                        }
                }
index aed6b18..4f40c38 100644 (file)
@@ -387,15 +387,17 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
 
                foreach ( $deleteWheres as $deleteWhere ) {
                        $this->mDb->delete( $table, $deleteWhere, __METHOD__ );
-                       $factory->commitMasterChanges( __METHOD__ );
-                       $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+                       $factory->commitAndWaitForReplication(
+                               __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ]
+                       );
                }
 
                $insertBatches = array_chunk( $insertions, $bSize );
                foreach ( $insertBatches as $insertBatch ) {
                        $this->mDb->insert( $table, $insertBatch, __METHOD__, 'IGNORE' );
-                       $factory->commitMasterChanges( __METHOD__ );
-                       $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+                       $factory->commitAndWaitForReplication(
+                               __METHOD__, $this->ticket, [ 'wiki' => $this->mDb->getWikiID() ]
+                       );
                }
 
                if ( count( $insertions ) ) {
index bea33dc..b561021 100644 (file)
@@ -158,6 +158,8 @@ class CategoryMembershipChangeJob extends Job {
 
                $dbw = wfGetDB( DB_MASTER );
                $factory = wfGetLBFactory();
+               $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
+
                $catMembChange = new CategoryMembershipChange( $title, $newRev );
                $catMembChange->checkTemplateLinks();
 
@@ -168,8 +170,7 @@ class CategoryMembershipChangeJob extends Job {
                        $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
                        $catMembChange->triggerCategoryAddedNotification( $categoryTitle );
                        if ( $insertCount++ && ( $insertCount % $batchSize ) == 0 ) {
-                               $factory->commitMasterChanges( __METHOD__ );
-                               $factory->waitForReplication();
+                               $factory->commitAndWaitForReplication( __METHOD__, $ticket );
                        }
                }
 
@@ -177,8 +178,7 @@ class CategoryMembershipChangeJob extends Job {
                        $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
                        $catMembChange->triggerCategoryRemovedNotification( $categoryTitle );
                        if ( $insertCount++ && ( $insertCount++ % $batchSize ) == 0 ) {
-                               $factory->commitMasterChanges( __METHOD__ );
-                               $factory->waitForReplication();
+                               $factory->commitAndWaitForReplication( __METHOD__, $ticket );
                        }
                }
        }
index f39f8fd..8d565bd 100644 (file)
@@ -20,6 +20,7 @@
  * @file
  * @ingroup JobQueue
  */
+use \MediaWiki\MediaWikiServices;
 
 /**
  * Job to prune link tables for pages that were deleted
@@ -52,10 +53,12 @@ class DeleteLinksJob extends Job {
                        return false;
                }
 
+               $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
                $timestamp = isset( $this->params['timestamp'] ) ? $this->params['timestamp'] : null;
-
                $page = WikiPage::factory( $this->title ); // title when deleted
+
                $update = new LinksDeletionUpdate( $page, $pageId, $timestamp );
+               $update->setTransactionTicket( $factory->getEmptyTransactionTicket( __METHOD__ ) );
                DataUpdate::runUpdates( [ $update ] );
 
                return true;
index 3ce4324..f09ba57 100644 (file)
@@ -114,11 +114,11 @@ class HTMLCacheUpdateJob extends Job {
 
                $dbw = wfGetDB( DB_MASTER );
                $factory = wfGetLBFactory();
+               $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
                // Update page_touched (skipping pages already touched since the root job).
                // Check $wgUpdateRowsPerQuery for sanity; batch jobs are sized by that already.
                foreach ( array_chunk( $pageIds, $wgUpdateRowsPerQuery ) as $batch ) {
-                       $factory->commitMasterChanges( __METHOD__ );
-                       $factory->waitForReplication();
+                       $factory->commitAndWaitForReplication( __METHOD__, $ticket );
 
                        $dbw->update( 'page',
                                [ 'page_touched' => $dbw->timestamp( $touchTimestamp ) ],
index 8430764..2fd3899 100644 (file)
@@ -82,6 +82,7 @@ class RecentChangesUpdateJob extends Job {
                }
 
                $factory = wfGetLBFactory();
+               $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
                $cutoff = $dbw->timestamp( time() - $wgRCMaxAge );
                do {
                        $rcIds = $dbw->selectFieldValues( 'recentchanges',
@@ -92,14 +93,11 @@ class RecentChangesUpdateJob extends Job {
                        );
                        if ( $rcIds ) {
                                $dbw->delete( 'recentchanges', [ 'rc_id' => $rcIds ], __METHOD__ );
-                       }
-                       // Commit in chunks to avoid slave lag
-                       $factory->commitMasterChanges( __METHOD__ );
-
-                       if ( count( $rcIds ) === $wgUpdateRowsPerQuery ) {
                                // There might be more, so try waiting for slaves
                                try {
-                                       wfGetLBFactory()->waitForReplication( [ 'timeout' => 3 ] );
+                                       $factory->commitAndWaitForReplication(
+                                               __METHOD__, $ticket, [ 'timeout' => 3 ]
+                                       );
                                } catch ( DBReplicationWaitError $e ) {
                                        // Another job will continue anyway
                                        break;
@@ -122,6 +120,8 @@ class RecentChangesUpdateJob extends Job {
                // JobRunner uses DBO_TRX, but doesn't call begin/commit itself;
                // onTransactionIdle() will run immediately since there is no trx.
                $dbw->onTransactionIdle( function() use ( $dbw, $days, $window ) {
+                       $factory = wfGetLBFactory();
+                       $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
                        // Avoid disconnect/ping() cycle that makes locks fall off
                        $dbw->setSessionOptions( [ 'connTimeout' => 900 ] );
 
@@ -205,7 +205,7 @@ class RecentChangesUpdateJob extends Job {
                                }
                                foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) {
                                        $dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ );
-                                       wfGetLBFactory()->waitForReplication();
+                                       $factory->commitAndWaitForReplication( __METHOD__, $ticket );
                                }
                        }
 
index 8fba728..9cdb161 100644 (file)
@@ -241,7 +241,10 @@ class RefreshLinksJob extends Job {
                        $parserOutput
                );
 
+               $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+               $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
                foreach ( $updates as $key => $update ) {
+                       $update->setTransactionTicket( $ticket );
                        // FIXME: This code probably shouldn't be here?
                        // Needed by things like Echo notifications which need
                        // to know which user caused the links update