Reduce CategoryMembershipChangeJob lock timeout
[lhc/web/wiklou.git] / includes / jobqueue / jobs / CategoryMembershipChangeJob.php
index b561021..a52ff06 100644 (file)
@@ -19,6 +19,7 @@
  *
  * @file
  */
+use MediaWiki\MediaWikiServices;
 
 /**
  * Job to add recent change entries mentioning category membership changes
@@ -32,6 +33,9 @@
  * @since 1.27
  */
 class CategoryMembershipChangeJob extends Job {
+       /** @var integer|null */
+       private $ticket;
+
        const ENQUEUE_FUDGE_SEC = 60;
 
        public function __construct( Title $title, array $params ) {
@@ -42,29 +46,34 @@ class CategoryMembershipChangeJob extends Job {
        }
 
        public function run() {
+               $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+               $lb = $lbFactory->getMainLB();
+               $dbw = $lb->getConnection( DB_MASTER );
+
+               $this->ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
+
                $page = WikiPage::newFromID( $this->params['pageId'], WikiPage::READ_LATEST );
                if ( !$page ) {
                        $this->setLastError( "Could not find page #{$this->params['pageId']}" );
                        return false; // deleted?
                }
 
-               $dbw = wfGetDB( DB_MASTER );
                // Use a named lock so that jobs for this page see each others' changes
                $lockKey = "CategoryMembershipUpdates:{$page->getId()}";
-               $scopedLock = $dbw->getScopedLockAndFlush( $lockKey, __METHOD__, 10 );
+               $scopedLock = $dbw->getScopedLockAndFlush( $lockKey, __METHOD__, 3 );
                if ( !$scopedLock ) {
                        $this->setLastError( "Could not acquire lock '$lockKey'" );
                        return false;
                }
 
-               $dbr = wfGetDB( DB_SLAVE, [ 'recentchanges' ] );
-               // Wait till the slave is caught up so that jobs for this page see each others' changes
-               if ( !wfGetLB()->safeWaitForMasterPos( $dbr ) ) {
-                       $this->setLastError( "Timed out while waiting for slave to catch up" );
+               $dbr = $lb->getConnection( DB_REPLICA, [ 'recentchanges' ] );
+               // Wait till the replica DB is caught up so that jobs for this page see each others' changes
+               if ( !$lb->safeWaitForMasterPos( $dbr ) ) {
+                       $this->setLastError( "Timed out while waiting for replica DB to catch up" );
                        return false;
                }
                // Clear any stale REPEATABLE-READ snapshot
-               wfGetLBFactory()->commitAll( __METHOD__ );
+               $dbr->flushSnapshot( __METHOD__ );
 
                $cutoffUnix = wfTimestamp( TS_UNIX, $this->params['revTimestamp'] );
                // Using ENQUEUE_FUDGE_SEC handles jobs inserted out of revision order due to the delay
@@ -119,18 +128,21 @@ class CategoryMembershipChangeJob extends Job {
 
                // Apply all category updates in revision timestamp order
                foreach ( $res as $row ) {
-                       $this->notifyUpdatesForRevision( $page, Revision::newFromRow( $row ) );
+                       $this->notifyUpdatesForRevision( $lbFactory, $page, Revision::newFromRow( $row ) );
                }
 
                return true;
        }
 
        /**
+        * @param LBFactory $lbFactory
         * @param WikiPage $page
         * @param Revision $newRev
         * @throws MWException
         */
-       protected function notifyUpdatesForRevision( WikiPage $page, Revision $newRev ) {
+       protected function notifyUpdatesForRevision(
+               LBFactory $lbFactory, WikiPage $page, Revision $newRev
+       ) {
                $config = RequestContext::getMain()->getConfig();
                $title = $page->getTitle();
 
@@ -156,10 +168,6 @@ class CategoryMembershipChangeJob extends Job {
                        return; // nothing to do
                }
 
-               $dbw = wfGetDB( DB_MASTER );
-               $factory = wfGetLBFactory();
-               $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
-
                $catMembChange = new CategoryMembershipChange( $title, $newRev );
                $catMembChange->checkTemplateLinks();
 
@@ -170,7 +178,7 @@ class CategoryMembershipChangeJob extends Job {
                        $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
                        $catMembChange->triggerCategoryAddedNotification( $categoryTitle );
                        if ( $insertCount++ && ( $insertCount % $batchSize ) == 0 ) {
-                               $factory->commitAndWaitForReplication( __METHOD__, $ticket );
+                               $lbFactory->commitAndWaitForReplication( __METHOD__, $this->ticket );
                        }
                }
 
@@ -178,7 +186,7 @@ class CategoryMembershipChangeJob extends Job {
                        $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
                        $catMembChange->triggerCategoryRemovedNotification( $categoryTitle );
                        if ( $insertCount++ && ( $insertCount++ % $batchSize ) == 0 ) {
-                               $factory->commitAndWaitForReplication( __METHOD__, $ticket );
+                               $lbFactory->commitAndWaitForReplication( __METHOD__, $this->ticket );
                        }
                }
        }