Merge "parser: Remove trailing newline after prefixes have been cleared"
[lhc/web/wiklou.git] / includes / watcheditem / WatchedItemStore.php
index a9bba7a..e092859 100644 (file)
@@ -4,7 +4,6 @@ use Wikimedia\Rdbms\IDatabase;
 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
 use MediaWiki\Linker\LinkTarget;
 use Wikimedia\Assert\Assert;
-use Wikimedia\Rdbms\LBFactory;
 use Wikimedia\ScopedCallback;
 use Wikimedia\Rdbms\ILBFactory;
 use Wikimedia\Rdbms\LoadBalancer;
@@ -29,6 +28,16 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
         */
        private $loadBalancer;
 
+       /**
+        * @var JobQueueGroup
+        */
+       private $queueGroup;
+
+       /**
+        * @var BagOStuff
+        */
+       private $stash;
+
        /**
         * @var ReadOnlyMode
         */
@@ -39,6 +48,11 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
         */
        private $cache;
 
+       /**
+        * @var HashBagOStuff
+        */
+       private $latestUpdateCache;
+
        /**
         * @var array[] Looks like $cacheIndex[Namespace ID][Target DB Key][User Id] => 'key'
         * The index is needed so that on mass changes all relevant items can be un-cached.
@@ -69,18 +83,24 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
 
        /**
         * @param ILBFactory $lbFactory
+        * @param JobQueueGroup $queueGroup
+        * @param BagOStuff $stash
         * @param HashBagOStuff $cache
         * @param ReadOnlyMode $readOnlyMode
         * @param int $updateRowsPerQuery
         */
        public function __construct(
                ILBFactory $lbFactory,
+               JobQueueGroup $queueGroup,
+               BagOStuff $stash,
                HashBagOStuff $cache,
                ReadOnlyMode $readOnlyMode,
                $updateRowsPerQuery
        ) {
                $this->lbFactory = $lbFactory;
                $this->loadBalancer = $lbFactory->getMainLB();
+               $this->queueGroup = $queueGroup;
+               $this->stash = $stash;
                $this->cache = $cache;
                $this->readOnlyMode = $readOnlyMode;
                $this->stats = new NullStatsdDataFactory();
@@ -89,6 +109,8 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
                $this->revisionGetTimestampFromIdCallback =
                        [ Revision::class, 'getTimestampFromId' ];
                $this->updateRowsPerQuery = $updateRowsPerQuery;
+
+               $this->latestUpdateCache = new HashBagOStuff( [ 'maxKeys' => 3 ] );
        }
 
        /**
@@ -287,8 +309,7 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
         */
        public function clearUserWatchedItemsUsingJobQueue( User $user ) {
                $job = ClearUserWatchlistJob::newForUser( $user, $this->getMaxId() );
-               // TODO inject me.
-               JobQueueGroup::singleton()->push( $job );
+               $this->queueGroup->push( $job );
        }
 
        /**
@@ -569,6 +590,7 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
                }
 
                $dbr = $this->getConnectionRef( DB_REPLICA );
+
                $row = $dbr->selectRow(
                        'watchlist',
                        'wl_notificationtimestamp',
@@ -583,7 +605,7 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
                $item = new WatchedItem(
                        $user,
                        $target,
-                       wfTimestampOrNull( TS_MW, $row->wl_notificationtimestamp )
+                       $this->getLatestNotificationTimestamp( $row->wl_notificationtimestamp, $user, $target )
                );
                $this->cache( $item );
 
@@ -623,11 +645,13 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
 
                $watchedItems = [];
                foreach ( $res as $row ) {
+                       $target = new TitleValue( (int)$row->wl_namespace, $row->wl_title );
                        // @todo: Should we add these to the process cache?
                        $watchedItems[] = new WatchedItem(
                                $user,
                                new TitleValue( (int)$row->wl_namespace, $row->wl_title ),
-                               $row->wl_notificationtimestamp
+                               $this->getLatestNotificationTimestamp(
+                                       $row->wl_notificationtimestamp, $user, $target )
                        );
                }
 
@@ -689,8 +713,10 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
                );
 
                foreach ( $res as $row ) {
+                       $target = new TitleValue( (int)$row->wl_namespace, $row->wl_title );
                        $timestamps[$row->wl_namespace][$row->wl_title] =
-                               wfTimestampOrNull( TS_MW, $row->wl_notificationtimestamp );
+                               $this->getLatestNotificationTimestamp(
+                                       $row->wl_notificationtimestamp, $user, $target );
                }
 
                return $timestamps;
@@ -803,7 +829,7 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
                        $timestamp = $dbw->timestamp( $timestamp );
                }
 
-               $success = $dbw->update(
+               $dbw->update(
                        'watchlist',
                        [ 'wl_notificationtimestamp' => $timestamp ],
                        $conds,
@@ -812,7 +838,25 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
 
                $this->uncacheUser( $user );
 
-               return $success;
+               return true;
+       }
+
+       public function getLatestNotificationTimestamp( $timestamp, User $user, LinkTarget $target ) {
+               $timestamp = wfTimestampOrNull( TS_MW, $timestamp );
+               if ( $timestamp === null ) {
+                       return null; // no notification
+               }
+
+               $seenTimestamps = $this->getPageSeenTimestamps( $user );
+               if (
+                       $seenTimestamps &&
+                       $seenTimestamps->get( $this->getPageSeenKey( $target ) ) >= $timestamp
+               ) {
+                       // If a reset job did not yet run, then the "seen" timestamp will be higher
+                       return null;
+               }
+
+               return $timestamp;
        }
 
        public function resetAllNotificationTimestampsForUser( User $user ) {
@@ -903,11 +947,17 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
         * @return bool
         */
        public function resetNotificationTimestamp( User $user, Title $title, $force = '', $oldid = 0 ) {
+               $time = time();
+
                // Only loggedin user can have a watchlist
                if ( $this->readOnlyMode->isReadOnly() || $user->isAnon() ) {
                        return false;
                }
 
+               if ( !Hooks::run( 'BeforeResetNotificationTimestamp', [ &$user, &$title, $force, &$oldid ] ) ) {
+                       return false;
+               }
+
                $item = null;
                if ( $force != 'force' ) {
                        $item = $this->loadWatchedItem( $user, $title );
@@ -916,6 +966,37 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
                        }
                }
 
+               // Get the timestamp (TS_MW) of this revision to track the latest one seen
+               $seenTime = call_user_func(
+                       $this->revisionGetTimestampFromIdCallback,
+                       $title,
+                       $oldid ?: $title->getLatestRevID()
+               );
+
+               // Mark the item as read immediately in lightweight storage
+               $this->stash->merge(
+                       $this->getPageSeenTimestampsKey( $user ),
+                       function ( $cache, $key, $current ) use ( $title, $seenTime ) {
+                               $value = $current ?: new MapCacheLRU( 300 );
+                               $subKey = $this->getPageSeenKey( $title );
+
+                               if ( $seenTime > $value->get( $subKey ) ) {
+                                       // Revision is newer than the last one seen
+                                       $value->set( $subKey, $seenTime );
+                                       $this->latestUpdateCache->set( $key, $value, IExpiringStore::TTL_PROC_LONG );
+                               } elseif ( $seenTime === false ) {
+                                       // Revision does not exist
+                                       $value->set( $subKey, wfTimestamp( TS_MW ) );
+                                       $this->latestUpdateCache->set( $key, $value, IExpiringStore::TTL_PROC_LONG );
+                               } else {
+                                       return false; // nothing to update
+                               }
+
+                               return $value;
+                       },
+                       IExpiringStore::TTL_HOUR
+               );
+
                // If the page is watched by the user (or may be watched), update the timestamp
                $job = new ActivityUpdateJob(
                        $title,
@@ -923,22 +1004,51 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
                                'type'      => 'updateWatchlistNotification',
                                'userid'    => $user->getId(),
                                'notifTime' => $this->getNotificationTimestamp( $user, $title, $item, $force, $oldid ),
-                               'curTime'   => time()
+                               'curTime'   => $time
                        ]
                );
+               // Try to enqueue this post-send
+               $this->queueGroup->lazyPush( $job );
 
-               // Try to run this post-send
-               // Calls DeferredUpdates::addCallableUpdate in normal operation
-               call_user_func(
-                       $this->deferredUpdatesAddCallableUpdateCallback,
-                       function () use ( $job ) {
-                               $job->run();
+               $this->uncache( $user, $title );
+
+               return true;
+       }
+
+       /**
+        * @param User $user
+        * @return MapCacheLRU|null The map contains prefixed title keys and TS_MW values
+        */
+       private function getPageSeenTimestamps( User $user ) {
+               $key = $this->getPageSeenTimestampsKey( $user );
+
+               return $this->latestUpdateCache->getWithSetCallback(
+                       $key,
+                       IExpiringStore::TTL_PROC_LONG,
+                       function () use ( $key ) {
+                               return $this->stash->get( $key ) ?: null;
                        }
                );
+       }
 
-               $this->uncache( $user, $title );
+       /**
+        * @param User $user
+        * @return string
+        */
+       private function getPageSeenTimestampsKey( User $user ) {
+               return $this->stash->makeGlobalKey(
+                       'watchlist-recent-updates',
+                       $this->lbFactory->getLocalDomainID(),
+                       $user->getId()
+               );
+       }
 
-               return true;
+       /**
+        * @param LinkTarget $target
+        * @return string
+        */
+       private function getPageSeenKey( LinkTarget $target ) {
+               return "{$target->getNamespace()}:{$target->getDBkey()}";
        }
 
        private function getNotificationTimestamp( User $user, Title $title, $item, $force, $oldid ) {
@@ -995,25 +1105,22 @@ class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterfac
         * @return int|bool
         */
        public function countUnreadNotifications( User $user, $unreadLimit = null ) {
+               $dbr = $this->getConnectionRef( DB_REPLICA );
+
                $queryOptions = [];
                if ( $unreadLimit !== null ) {
                        $unreadLimit = (int)$unreadLimit;
                        $queryOptions['LIMIT'] = $unreadLimit;
                }
 
-               $dbr = $this->getConnectionRef( DB_REPLICA );
-               $rowCount = $dbr->selectRowCount(
-                       'watchlist',
-                       '1',
-                       [
-                               'wl_user' => $user->getId(),
-                               'wl_notificationtimestamp IS NOT NULL',
-                       ],
-                       __METHOD__,
-                       $queryOptions
-               );
+               $conds = [
+                       'wl_user' => $user->getId(),
+                       'wl_notificationtimestamp IS NOT NULL'
+               ];
+
+               $rowCount = $dbr->selectRowCount( 'watchlist', '1', $conds, __METHOD__, $queryOptions );
 
-               if ( !isset( $unreadLimit ) ) {
+               if ( $unreadLimit === null ) {
                        return $rowCount;
                }