Database transaction flushing cleanups
authorAaron Schulz <aschulz@wikimedia.org>
Sat, 13 Aug 2016 01:56:41 +0000 (18:56 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 16 Aug 2016 20:34:54 +0000 (20:34 +0000)
* Do not commit() inside masterPosWait(). This could happen
  inside JobRunner::commitMasterChanges, resulting in
  one DB committing while the others may or may not later commit.
* Migrate some commit() callers to commitMasterChanges().
* Removed unsafe upload class commit() which could break
  outer transactions.
* Also cleaned up the "flush" flag to make it harder to misuse.

Change-Id: I75193baaed979100c5338abe0c0899abba3444cb

includes/WatchedItemStore.php
includes/db/Database.php
includes/db/DatabaseMysqlBase.php
includes/db/IDatabase.php
includes/db/loadbalancer/LoadBalancer.php
includes/deferred/LinksDeletionUpdate.php
includes/deferred/LinksUpdate.php
includes/jobqueue/jobs/CategoryMembershipChangeJob.php
includes/jobqueue/jobs/HTMLCacheUpdateJob.php
includes/jobqueue/jobs/RecentChangesUpdateJob.php
includes/upload/UploadFromChunks.php

index 89ca50c..92cdee5 100644 (file)
@@ -741,6 +741,7 @@ class WatchedItemStore implements StatsdAwareInterface {
                                        global $wgUpdateRowsPerQuery;
 
                                        $dbw = $this->getConnection( DB_MASTER );
+                                       $factory = wfGetLBFactory();
 
                                        $watchersChunks = array_chunk( $watchers, $wgUpdateRowsPerQuery );
                                        foreach ( $watchersChunks as $watchersChunk ) {
@@ -754,8 +755,8 @@ class WatchedItemStore implements StatsdAwareInterface {
                                                        ], $fname
                                                );
                                                if ( count( $watchersChunks ) > 1 ) {
-                                                       $dbw->commit( __METHOD__, 'flush' );
-                                                       wfGetLBFactory()->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
+                                                       $factory->commitMasterChanges( __METHOD__ );
+                                                       $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
                                                }
                                        }
                                        $this->uncacheLinkTarget( $target );
index 4e358d4..3ea2cda 100644 (file)
@@ -703,7 +703,7 @@ abstract class DatabaseBase implements IDatabase {
                                                " performing implicit commit before closing connection!" );
                                }
 
-                               $this->commit( __METHOD__, 'flush' );
+                               $this->commit( __METHOD__, self::FLUSHING_INTERNAL );
                        }
 
                        $closed = $this->closeConnection();
@@ -2651,7 +2651,7 @@ abstract class DatabaseBase implements IDatabase {
                }
 
                if ( !$this->mTrxAtomicLevels && $this->mTrxAutomaticAtomic ) {
-                       $this->commit( $fname, 'flush' );
+                       $this->commit( $fname, self::FLUSHING_INTERNAL );
                }
        }
 
@@ -2746,7 +2746,7 @@ abstract class DatabaseBase implements IDatabase {
                        );
                }
 
-               if ( $flush === 'flush' ) {
+               if ( $flush === self::FLUSHING_INTERNAL || $flush === self::FLUSHING_ALL_PEERS ) {
                        if ( !$this->mTrxLevel ) {
                                return; // nothing to do
                        } elseif ( !$this->mTrxAutomatic ) {
@@ -2796,7 +2796,7 @@ abstract class DatabaseBase implements IDatabase {
        }
 
        final public function rollback( $fname = __METHOD__, $flush = '' ) {
-               if ( $flush !== 'flush' ) {
+               if ( $flush !== self::FLUSHING_INTERNAL && $flush !== self::FLUSHING_ALL_PEERS ) {
                        if ( !$this->mTrxLevel ) {
                                wfWarn( "$fname: No transaction to rollback, something got out of sync!" );
                                return; // nothing to do
@@ -3302,11 +3302,11 @@ abstract class DatabaseBase implements IDatabase {
                }
 
                $unlocker = new ScopedCallback( function () use ( $lockKey, $fname ) {
-                       $this->commit( __METHOD__, 'flush' );
+                       $this->commit( __METHOD__, self::FLUSHING_INTERNAL );
                        $this->unlock( $lockKey, $fname );
                } );
 
-               $this->commit( __METHOD__, 'flush' );
+               $this->commit( __METHOD__, self::FLUSHING_INTERNAL );
 
                return $unlocker;
        }
index 6380fe7..d1ebe62 100644 (file)
@@ -767,9 +767,6 @@ abstract class DatabaseMysqlBase extends Database {
                        return 0; // already reached this point for sure
                }
 
-               // Commit any open transactions
-               $this->commit( __METHOD__, 'flush' );
-
                // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
                if ( $this->useGTIDs && $pos->gtids ) {
                        // Wait on the GTID set (MariaDB only)
index 43592e2..af024b8 100644 (file)
  * @ingroup Database
  */
 interface IDatabase {
-       /* Constants to onTransactionResolution() callbacks */
+       /** @var int Callback triggered immediately due to no active transaction */
        const TRIGGER_IDLE = 1;
+       /** @var int Callback triggered by commit */
        const TRIGGER_COMMIT = 2;
+       /** @var int Callback triggered by rollback */
        const TRIGGER_ROLLBACK = 3;
 
+       /** @var string Transaction operation comes from service managing all DBs */
+       const FLUSHING_ALL_PEERS = 'flush';
+       /** @var string Transaction operation comes from the database class internally */
+       const FLUSHING_INTERNAL = 'flush';
+
        /**
         * A string describing the current software version, and possibly
         * other details in a user-friendly way. Will be listed on Special:Version, etc.
@@ -1366,9 +1373,9 @@ interface IDatabase {
         * Nesting of transactions is not supported.
         *
         * @param string $fname
-        * @param string $flush Flush flag, set to 'flush' to disable warnings about
-        *   explicitly committing implicit transactions, or calling commit when no
-        *   transaction is in progress.
+        * @param string $flush Flush flag, set to situationally valid IDatabase::FLUSHING_*
+        *   constant to disable warnings about explicitly committing implicit transactions,
+        *   or calling commit when no transaction is in progress.
         *
         *   This will trigger an exception if there is an ongoing explicit transaction.
         *
@@ -1386,10 +1393,10 @@ interface IDatabase {
         * No-op on non-transactional databases.
         *
         * @param string $fname
-        * @param string $flush Flush flag, set to 'flush' to disable warnings about
-        *   calling rollback when no transaction is in progress. This will silently
-        *   break any ongoing explicit transaction. Only set the flush flag if you
-        *   are sure that it is safe to ignore these warnings in your context.
+        * @param string $flush Flush flag, set to a situationally valid IDatabase::FLUSHING_*
+        *   constant to disable warnings about calling rollback when no transaction is in
+        *   progress. This will silently break any ongoing explicit transaction. Only set the
+        *   flush flag if you are sure that it is safe to ignore these warnings in your context.
         * @throws DBUnexpectedError
         * @since 1.23 Added $flush parameter
         */
index a7c486c..d08172a 100644 (file)
@@ -1062,7 +1062,7 @@ class LoadBalancer {
         */
        public function commitAll( $fname = __METHOD__ ) {
                $this->forEachOpenConnection( function ( DatabaseBase $conn ) use ( $fname ) {
-                       $conn->commit( $fname, 'flush' );
+                       $conn->commit( $fname, IDatabase::FLUSHING_ALL_PEERS );
                } );
        }
 
@@ -1109,7 +1109,7 @@ class LoadBalancer {
        public function commitMasterChanges( $fname = __METHOD__ ) {
                $this->forEachOpenMasterConnection( function ( DatabaseBase $conn ) use ( $fname ) {
                        if ( $conn->writesOrCallbacksPending() ) {
-                               $conn->commit( $fname, 'flush' );
+                               $conn->commit( $fname, IDatabase::FLUSHING_ALL_PEERS );
                        }
                } );
        }
@@ -1143,7 +1143,7 @@ class LoadBalancer {
                        foreach ( $conns2[$masterIndex] as $conn ) {
                                if ( $conn->trxLevel() && $conn->writesOrCallbacksPending() ) {
                                        try {
-                                               $conn->rollback( $fname, 'flush' );
+                                               $conn->rollback( $fname, IDatabase::FLUSHING_ALL_PEERS );
                                        } catch ( DBError $e ) {
                                                MWExceptionHandler::logException( $e );
                                                $failedServers[] = $conn->getServer();
index 0009781..f96df48 100644 (file)
@@ -54,6 +54,7 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate
        public function doUpdate() {
                $config = RequestContext::getMain()->getConfig();
                $batchSize = $config->get( 'UpdateRowsPerQuery' );
+               $factory = wfGetLBFactory();
 
                // Page may already be deleted, so don't just getId()
                $id = $this->pageId;
@@ -77,8 +78,8 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate
                foreach ( $catBatches as $catBatch ) {
                        $this->page->updateCategoryCounts( [], $catBatch, $id );
                        if ( count( $catBatches ) > 1 ) {
-                               $this->mDb->commit( __METHOD__, 'flush' );
-                               wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+                               $factory->commitMasterChanges( __METHOD__ );
+                               $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
                        }
                }
 
@@ -173,8 +174,8 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate
                        foreach ( $rcIdBatches as $rcIdBatch ) {
                                $this->mDb->delete( 'recentchanges', [ 'rc_id' => $rcIdBatch ], __METHOD__ );
                                if ( count( $rcIdBatches ) > 1 ) {
-                                       $this->mDb->commit( __METHOD__, 'flush' );
-                                       wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+                                       $factory->commitMasterChanges( __METHOD__ );
+                                       $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
                                }
                        }
                }
@@ -185,6 +186,7 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate
 
        private function batchDeleteByPK( $table, array $conds, array $pk, $bSize ) {
                $dbw = $this->mDb; // convenience
+               $factory = wfGetLBFactory();
                $res = $dbw->select( $table, $pk, $conds, __METHOD__ );
 
                $pkDeleteConds = [];
@@ -192,8 +194,8 @@ class LinksDeletionUpdate extends SqlDataUpdate implements EnqueueableDataUpdate
                        $pkDeleteConds[] = $this->mDb->makeList( (array)$row, LIST_AND );
                        if ( count( $pkDeleteConds ) >= $bSize ) {
                                $dbw->delete( $table, $dbw->makeList( $pkDeleteConds, LIST_OR ), __METHOD__ );
-                               $dbw->commit( __METHOD__, 'flush' );
-                               wfGetLBFactory()->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
+                               $factory->commitMasterChanges( __METHOD__ );
+                               $factory->waitForReplication( [ 'wiki' => $dbw->getWikiID() ] );
                                $pkDeleteConds = [];
                        }
                }
index 22944eb..aed6b18 100644 (file)
@@ -335,6 +335,7 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
         */
        private function incrTableUpdate( $table, $prefix, $deletions, $insertions ) {
                $bSize = RequestContext::getMain()->getConfig()->get( 'UpdateRowsPerQuery' );
+               $factory = wfGetLBFactory();
 
                if ( $table === 'page_props' ) {
                        $fromField = 'pp_page';
@@ -386,15 +387,15 @@ class LinksUpdate extends SqlDataUpdate implements EnqueueableDataUpdate {
 
                foreach ( $deleteWheres as $deleteWhere ) {
                        $this->mDb->delete( $table, $deleteWhere, __METHOD__ );
-                       $this->mDb->commit( __METHOD__, 'flush' );
-                       wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+                       $factory->commitMasterChanges( __METHOD__ );
+                       $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
                }
 
                $insertBatches = array_chunk( $insertions, $bSize );
                foreach ( $insertBatches as $insertBatch ) {
                        $this->mDb->insert( $table, $insertBatch, __METHOD__, 'IGNORE' );
-                       $this->mDb->commit( __METHOD__, 'flush' );
-                       wfGetLBFactory()->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
+                       $factory->commitMasterChanges( __METHOD__ );
+                       $factory->waitForReplication( [ 'wiki' => $this->mDb->getWikiID() ] );
                }
 
                if ( count( $insertions ) ) {
index 57e69b4..bea33dc 100644 (file)
@@ -64,7 +64,7 @@ class CategoryMembershipChangeJob extends Job {
                        return false;
                }
                // Clear any stale REPEATABLE-READ snapshot
-               $dbr->commit( __METHOD__, 'flush' );
+               wfGetLBFactory()->commitAll( __METHOD__ );
 
                $cutoffUnix = wfTimestamp( TS_UNIX, $this->params['revTimestamp'] );
                // Using ENQUEUE_FUDGE_SEC handles jobs inserted out of revision order due to the delay
@@ -157,6 +157,7 @@ class CategoryMembershipChangeJob extends Job {
                }
 
                $dbw = wfGetDB( DB_MASTER );
+               $factory = wfGetLBFactory();
                $catMembChange = new CategoryMembershipChange( $title, $newRev );
                $catMembChange->checkTemplateLinks();
 
@@ -167,8 +168,8 @@ class CategoryMembershipChangeJob extends Job {
                        $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
                        $catMembChange->triggerCategoryAddedNotification( $categoryTitle );
                        if ( $insertCount++ && ( $insertCount % $batchSize ) == 0 ) {
-                               $dbw->commit( __METHOD__, 'flush' );
-                               wfGetLBFactory()->waitForReplication();
+                               $factory->commitMasterChanges( __METHOD__ );
+                               $factory->waitForReplication();
                        }
                }
 
@@ -176,8 +177,8 @@ class CategoryMembershipChangeJob extends Job {
                        $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
                        $catMembChange->triggerCategoryRemovedNotification( $categoryTitle );
                        if ( $insertCount++ && ( $insertCount++ % $batchSize ) == 0 ) {
-                               $dbw->commit( __METHOD__, 'flush' );
-                               wfGetLBFactory()->waitForReplication();
+                               $factory->commitMasterChanges( __METHOD__ );
+                               $factory->waitForReplication();
                        }
                }
        }
index a14cdd7..3ce4324 100644 (file)
@@ -113,11 +113,12 @@ class HTMLCacheUpdateJob extends Job {
                $touchTimestamp = wfTimestampNow();
 
                $dbw = wfGetDB( DB_MASTER );
+               $factory = wfGetLBFactory();
                // Update page_touched (skipping pages already touched since the root job).
                // Check $wgUpdateRowsPerQuery for sanity; batch jobs are sized by that already.
                foreach ( array_chunk( $pageIds, $wgUpdateRowsPerQuery ) as $batch ) {
-                       $dbw->commit( __METHOD__, 'flush' );
-                       wfGetLBFactory()->waitForReplication();
+                       $factory->commitMasterChanges( __METHOD__ );
+                       $factory->waitForReplication();
 
                        $dbw->update( 'page',
                                [ 'page_touched' => $dbw->timestamp( $touchTimestamp ) ],
index fbc1572..8430764 100644 (file)
@@ -81,6 +81,7 @@ class RecentChangesUpdateJob extends Job {
                        return; // already in progress
                }
 
+               $factory = wfGetLBFactory();
                $cutoff = $dbw->timestamp( time() - $wgRCMaxAge );
                do {
                        $rcIds = $dbw->selectFieldValues( 'recentchanges',
@@ -93,7 +94,7 @@ class RecentChangesUpdateJob extends Job {
                                $dbw->delete( 'recentchanges', [ 'rc_id' => $rcIds ], __METHOD__ );
                        }
                        // Commit in chunks to avoid slave lag
-                       $dbw->commit( __METHOD__, 'flush' );
+                       $factory->commitMasterChanges( __METHOD__ );
 
                        if ( count( $rcIds ) === $wgUpdateRowsPerQuery ) {
                                // There might be more, so try waiting for slaves
index cc1d698..247f608 100644 (file)
@@ -235,8 +235,6 @@ class UploadFromChunks extends UploadFromFile {
                        $this->getOffset() . ' inx:' . $this->getChunkIndex() . "\n" );
 
                $dbw = $this->repo->getMasterDB();
-               // Use a quick transaction since we will upload the full temp file into shared
-               // storage, which takes time for large files. We don't want to hold locks then.
                $dbw->update(
                        'uploadstash',
                        [
@@ -247,7 +245,6 @@ class UploadFromChunks extends UploadFromFile {
                        [ 'us_key' => $this->mFileKey ],
                        __METHOD__
                );
-               $dbw->commit( __METHOD__, 'flush' );
        }
 
        /**