Introduce ClearUserWatchlistJob
authoraddshore <addshorewiki@gmail.com>
Mon, 14 Mar 2016 21:07:39 +0000 (21:07 +0000)
committerAaron Schulz <aschulz@wikimedia.org>
Tue, 28 Nov 2017 17:11:40 +0000 (17:11 +0000)
Change-Id: Icea573a10078ea3f09dc2e4e9fdc737bf639935d

autoload.php
includes/DefaultSettings.php
includes/jobqueue/jobs/ClearUserWatchlistJob.php [new file with mode: 0644]
includes/watcheditem/WatchedItemStore.php
tests/phpunit/includes/jobqueue/jobs/ClearUserWatchlistJobTest.php [new file with mode: 0644]

index 5005534..5ee4447 100644 (file)
@@ -265,6 +265,7 @@ $wgAutoloadLocalClasses = [
        'CleanupRemovedModules' => __DIR__ . '/maintenance/cleanupRemovedModules.php',
        'CleanupSpam' => __DIR__ . '/maintenance/cleanupSpam.php',
        'ClearInterwikiCache' => __DIR__ . '/maintenance/clearInterwikiCache.php',
+       'ClearUserWatchlistJob' => __DIR__ . '/includes/jobqueue/jobs/ClearUserWatchlistJob.php',
        'CliInstaller' => __DIR__ . '/includes/installer/CliInstaller.php',
        'CloneDatabase' => __DIR__ . '/includes/db/CloneDatabase.php',
        'CodeCleanerGlobalsPass' => __DIR__ . '/maintenance/CodeCleanerGlobalsPass.inc',
index 3cd7ef1..e378586 100644 (file)
@@ -7428,6 +7428,7 @@ $wgJobClasses = [
        'refreshLinksDynamic' => 'RefreshLinksJob',
        'activityUpdateJob' => 'ActivityUpdateJob',
        'categoryMembershipChange' => 'CategoryMembershipChangeJob',
+       'clearUserWatchlist' => 'ClearUserWatchlistJob',
        'cdnPurge' => 'CdnPurgeJob',
        'enqueue' => 'EnqueueJob', // local queue for multi-DC setups
        'null' => 'NullJob'
diff --git a/includes/jobqueue/jobs/ClearUserWatchlistJob.php b/includes/jobqueue/jobs/ClearUserWatchlistJob.php
new file mode 100644 (file)
index 0000000..17c4b66
--- /dev/null
@@ -0,0 +1,119 @@
+<?php
+
+use MediaWiki\MediaWikiServices;
+
+/**
+ * Job to clear a users watchlist in batches.
+ *
+ * @author Addshore
+ *
+ * @ingroup JobQueue
+ * @since 1.31
+ */
+class ClearUserWatchlistJob extends Job {
+
+       /**
+        * @param User $user User to clear the watchlist for.
+        * @param int $maxWatchlistId The maximum wl_id at the time the job was first created.
+        *
+        * @return ClearUserWatchlistJob
+        */
+       public static function newForUser( User $user, $maxWatchlistId ) {
+               return new self(
+                       null,
+                       [ 'userId' => $user->getId(), 'maxWatchlistId' => $maxWatchlistId ]
+               );
+       }
+
+       /**
+        * @param Title|null $title Not used by this job.
+        * @param array $params
+        *  - batchSize,      Number of watchlist entries to remove at once.
+        *  - userId,         The ID for the user whose watchlist is being cleared.
+        *  - maxWatchlistId, The maximum wl_id at the time the job was first created,
+        */
+       public function __construct( Title $title = null, array $params ) {
+               if ( !array_key_exists( 'batchSize', $params ) ) {
+                       $params['batchSize'] = 1000;
+               }
+
+               parent::__construct(
+                       'clearUserWatchlist',
+                       SpecialPage::getTitleFor( 'EditWatchlist', 'clear' ),
+                       $params
+               );
+
+               $this->removeDuplicates = true;
+       }
+
+       public function run() {
+               $userId = $this->params['userId'];
+               $maxWatchlistId = $this->params['maxWatchlistId'];
+
+               $loadBalancer = MediaWikiServices::getInstance()->getDBLoadBalancer();
+               $dbw = $loadBalancer->getConnection( DB_MASTER );
+               $dbr = $loadBalancer->getConnection( DB_REPLICA, [ 'watchlist' ] );
+
+               // Wait before lock to try to reduce time waiting in the lock.
+               if ( !$loadBalancer->safeWaitForMasterPos( $dbr ) ) {
+                       $this->setLastError( 'Timed out while waiting for slave to catch up before lock' );
+                       return false;
+               }
+
+               // Use a named lock so that jobs for this user see each others' changes
+               $lockKey = "ClearUserWatchlistJob:$userId";
+               $scopedLock = $dbw->getScopedLockAndFlush( $lockKey, __METHOD__, 10 );
+               if ( !$scopedLock ) {
+                       $this->setLastError( "Could not acquire lock '$lockKey'" );
+                       return false;
+               }
+
+               if ( !$loadBalancer->safeWaitForMasterPos( $dbr ) ) {
+                       $this->setLastError( 'Timed out while waiting for slave to catch up within lock' );
+                       return false;
+               }
+
+               // Clear any stale REPEATABLE-READ snapshot
+               $dbr->flushSnapshot( __METHOD__ );
+
+               $watchlistIds = $dbr->selectFieldValues(
+                       'watchlist',
+                       'wl_id',
+                       [
+                               'wl_user' => $userId,
+                               'wl_id <= ' . $maxWatchlistId
+                       ],
+                       __METHOD__,
+                       [
+                               'ORDER BY' => 'wl_id ASC',
+                               'LIMIT' => $this->params['batchSize'],
+                       ]
+               );
+
+               if ( count( $watchlistIds ) == 0 ) {
+                       return true;
+               }
+
+               $dbw->delete( 'watchlist', [ 'wl_id' => $watchlistIds ], __METHOD__ );
+
+               // Commit changes and remove lock before inserting next job.
+               $lbf = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+               $lbf->commitMasterChanges( __METHOD__ );
+               unset( $scopedLock );
+
+               if ( count( $watchlistIds ) == $this->params['batchSize'] ) {
+                       JobQueueGroup::singleton()->push( new self( $this->getTitle(), $this->getParams() ) );
+               }
+
+               return true;
+       }
+
+       public function getDeduplicationInfo() {
+               $info = parent::getDeduplicationInfo();
+               // This job never has a namespace or title so we can't use it for deduplication
+               unset( $info['namespace'] );
+               unset( $info['title'] );
+               return $info;
+       }
+
+}
index 094297c..f29bd47 100644 (file)
@@ -212,6 +212,33 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
                return $this->loadBalancer->getConnectionRef( $dbIndex, [ 'watchlist' ] );
        }
 
+       /**
+        * Queues a job that will clear the users watchlist using the Job Queue.
+        *
+        * @since 1.31
+        *
+        * @param User $user
+        */
+       public function clearUserWatchedItemsUsingJobQueue( User $user ) {
+               $job = ClearUserWatchlistJob::newForUser( $user, $this->getMaxId() );
+               // TODO inject me.
+               JobQueueGroup::singleton()->push( $job );
+       }
+
+       /**
+        * @since 1.31
+        * @return int The maximum current wl_id
+        */
+       public function getMaxId() {
+               $dbr = $this->getConnectionRef( DB_REPLICA );
+               return (int)$dbr->selectField(
+                       'watchlist',
+                       'MAX(wl_id)',
+                       '',
+                       __METHOD__
+               );
+       }
+
        /**
         * @since 1.31
         */
diff --git a/tests/phpunit/includes/jobqueue/jobs/ClearUserWatchlistJobTest.php b/tests/phpunit/includes/jobqueue/jobs/ClearUserWatchlistJobTest.php
new file mode 100644 (file)
index 0000000..385ecb7
--- /dev/null
@@ -0,0 +1,78 @@
+<?php
+use MediaWiki\MediaWikiServices;
+
+/**
+ * @covers ClearUserWatchlistJob
+ *
+ * @group JobQueue
+ * @group Database
+ *
+ * @licence GNU GPL v2+
+ * @author Addshore
+ */
+class ClearUserWatchlistJobTest extends MediaWikiTestCase {
+
+       public function setUp() {
+               parent::setUp();
+               self::$users['ClearUserWatchlistJobTestUser']
+                       = new TestUser( 'ClearUserWatchlistJobTestUser' );
+               $this->runJobs();
+               JobQueueGroup::destroySingletons();
+       }
+
+       private function getUser() {
+               return self::$users['ClearUserWatchlistJobTestUser']->getUser();
+       }
+
+       private function runJobs( $jobLimit = 9999 ) {
+               $runJobs = new RunJobs;
+               $runJobs->loadParamsAndArgs( null, [ 'quiet' => true, 'maxjobs' => $jobLimit ] );
+               $runJobs->execute();
+       }
+
+       private function getWatchedItemStore() {
+               return MediaWikiServices::getInstance()->getWatchedItemStore();
+       }
+
+       public function testRun() {
+               $user = $this->getUser();
+               $watchedItemStore = $this->getWatchedItemStore();
+
+               $watchedItemStore->addWatch( $user, new TitleValue( 0, 'A' ) );
+               $watchedItemStore->addWatch( $user, new TitleValue( 1, 'A' ) );
+               $watchedItemStore->addWatch( $user, new TitleValue( 0, 'B' ) );
+               $watchedItemStore->addWatch( $user, new TitleValue( 1, 'B' ) );
+
+               $maxId = $watchedItemStore->getMaxId();
+
+               $watchedItemStore->addWatch( $user, new TitleValue( 0, 'C' ) );
+               $watchedItemStore->addWatch( $user, new TitleValue( 1, 'C' ) );
+
+               JobQueueGroup::singleton()->push(
+                       new ClearUserWatchlistJob(
+                               null,
+                               [
+                                       'userId' => $user->getId(),
+                                       'batchSize' => 2,
+                                       'maxWatchlistId' => $maxId,
+                               ]
+                       )
+               );
+
+               $this->assertEquals( 1, JobQueueGroup::singleton()->getQueueSizes()['clearUserWatchlist'] );
+               $this->assertEquals( 6, $watchedItemStore->countWatchedItems( $user ) );
+               $this->runJobs( 1 );
+               $this->assertEquals( 1, JobQueueGroup::singleton()->getQueueSizes()['clearUserWatchlist'] );
+               $this->assertEquals( 4, $watchedItemStore->countWatchedItems( $user ) );
+               $this->runJobs( 1 );
+               $this->assertEquals( 1, JobQueueGroup::singleton()->getQueueSizes()['clearUserWatchlist'] );
+               $this->assertEquals( 2, $watchedItemStore->countWatchedItems( $user ) );
+               $this->runJobs( 1 );
+               $this->assertEquals( 0, JobQueueGroup::singleton()->getQueueSizes()['clearUserWatchlist'] );
+               $this->assertEquals( 2, $watchedItemStore->countWatchedItems( $user ) );
+
+               $this->assertTrue( $watchedItemStore->isWatched( $user, new TitleValue( 0, 'C' ) ) );
+               $this->assertTrue( $watchedItemStore->isWatched( $user, new TitleValue( 1, 'C' ) ) );
+       }
+
+}