use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
use MediaWiki\Linker\LinkTarget;
use Wikimedia\Assert\Assert;
-use Wikimedia\Rdbms\LBFactory;
use Wikimedia\ScopedCallback;
use Wikimedia\Rdbms\ILBFactory;
use Wikimedia\Rdbms\LoadBalancer;
*/
private $loadBalancer;
+ /**
+ * @var JobQueueGroup
+ */
+ private $queueGroup;
+
+ /**
+ * @var BagOStuff
+ */
+ private $stash;
+
/**
* @var ReadOnlyMode
*/
*/
private $cache;
+ /**
+ * @var HashBagOStuff
+ */
+ private $latestUpdateCache;
+
/**
* @var array[] Looks like $cacheIndex[Namespace ID][Target DB Key][User Id] => 'key'
* The index is needed so that on mass changes all relevant items can be un-cached.
/**
* @param ILBFactory $lbFactory
+ * @param JobQueueGroup $queueGroup
+ * @param BagOStuff $stash
* @param HashBagOStuff $cache
* @param ReadOnlyMode $readOnlyMode
* @param int $updateRowsPerQuery
*/
public function __construct(
ILBFactory $lbFactory,
+ JobQueueGroup $queueGroup,
+ BagOStuff $stash,
HashBagOStuff $cache,
ReadOnlyMode $readOnlyMode,
$updateRowsPerQuery
) {
$this->lbFactory = $lbFactory;
$this->loadBalancer = $lbFactory->getMainLB();
+ $this->queueGroup = $queueGroup;
+ $this->stash = $stash;
$this->cache = $cache;
$this->readOnlyMode = $readOnlyMode;
$this->stats = new NullStatsdDataFactory();
$this->revisionGetTimestampFromIdCallback =
[ Revision::class, 'getTimestampFromId' ];
$this->updateRowsPerQuery = $updateRowsPerQuery;
+
+ $this->latestUpdateCache = new HashBagOStuff( [ 'maxKeys' => 3 ] );
}
/**
*/
public function clearUserWatchedItemsUsingJobQueue( User $user ) {
$job = ClearUserWatchlistJob::newForUser( $user, $this->getMaxId() );
- // TODO inject me.
- JobQueueGroup::singleton()->push( $job );
+ $this->queueGroup->push( $job );
}
/**
}
$dbr = $this->getConnectionRef( DB_REPLICA );
+
$row = $dbr->selectRow(
'watchlist',
'wl_notificationtimestamp',
$item = new WatchedItem(
$user,
$target,
- wfTimestampOrNull( TS_MW, $row->wl_notificationtimestamp )
+ $this->getLatestNotificationTimestamp( $row->wl_notificationtimestamp, $user, $target )
);
$this->cache( $item );
$watchedItems = [];
foreach ( $res as $row ) {
+ $target = new TitleValue( (int)$row->wl_namespace, $row->wl_title );
// @todo: Should we add these to the process cache?
$watchedItems[] = new WatchedItem(
$user,
new TitleValue( (int)$row->wl_namespace, $row->wl_title ),
- $row->wl_notificationtimestamp
+ $this->getLatestNotificationTimestamp(
+ $row->wl_notificationtimestamp, $user, $target )
);
}
);
foreach ( $res as $row ) {
+ $target = new TitleValue( (int)$row->wl_namespace, $row->wl_title );
$timestamps[$row->wl_namespace][$row->wl_title] =
- wfTimestampOrNull( TS_MW, $row->wl_notificationtimestamp );
+ $this->getLatestNotificationTimestamp(
+ $row->wl_notificationtimestamp, $user, $target );
}
return $timestamps;
$timestamp = $dbw->timestamp( $timestamp );
}
- $success = $dbw->update(
+ $dbw->update(
'watchlist',
[ 'wl_notificationtimestamp' => $timestamp ],
$conds,
$this->uncacheUser( $user );
- return $success;
+ return true;
+ }
+
+ public function getLatestNotificationTimestamp( $timestamp, User $user, LinkTarget $target ) {
+ $timestamp = wfTimestampOrNull( TS_MW, $timestamp );
+ if ( $timestamp === null ) {
+ return null; // no notification
+ }
+
+ $seenTimestamps = $this->getPageSeenTimestamps( $user );
+ if (
+ $seenTimestamps &&
+ $seenTimestamps->get( $this->getPageSeenKey( $target ) ) >= $timestamp
+ ) {
+ // If a reset job did not yet run, then the "seen" timestamp will be higher
+ return null;
+ }
+
+ return $timestamp;
}
public function resetAllNotificationTimestampsForUser( User $user ) {
* @return bool
*/
public function resetNotificationTimestamp( User $user, Title $title, $force = '', $oldid = 0 ) {
+ $time = time();
+
// Only loggedin user can have a watchlist
if ( $this->readOnlyMode->isReadOnly() || $user->isAnon() ) {
return false;
}
+ if ( !Hooks::run( 'BeforeResetNotificationTimestamp', [ &$user, &$title, $force, &$oldid ] ) ) {
+ return false;
+ }
+
$item = null;
if ( $force != 'force' ) {
$item = $this->loadWatchedItem( $user, $title );
}
}
+ // Get the timestamp (TS_MW) of this revision to track the latest one seen
+ $seenTime = call_user_func(
+ $this->revisionGetTimestampFromIdCallback,
+ $title,
+ $oldid ?: $title->getLatestRevID()
+ );
+
+ // Mark the item as read immediately in lightweight storage
+ $this->stash->merge(
+ $this->getPageSeenTimestampsKey( $user ),
+ function ( $cache, $key, $current ) use ( $title, $seenTime ) {
+ $value = $current ?: new MapCacheLRU( 300 );
+ $subKey = $this->getPageSeenKey( $title );
+
+ if ( $seenTime > $value->get( $subKey ) ) {
+ // Revision is newer than the last one seen
+ $value->set( $subKey, $seenTime );
+ $this->latestUpdateCache->set( $key, $value, IExpiringStore::TTL_PROC_LONG );
+ } elseif ( $seenTime === false ) {
+ // Revision does not exist
+ $value->set( $subKey, wfTimestamp( TS_MW ) );
+ $this->latestUpdateCache->set( $key, $value, IExpiringStore::TTL_PROC_LONG );
+ } else {
+ return false; // nothing to update
+ }
+
+ return $value;
+ },
+ IExpiringStore::TTL_HOUR
+ );
+
// If the page is watched by the user (or may be watched), update the timestamp
$job = new ActivityUpdateJob(
$title,
'type' => 'updateWatchlistNotification',
'userid' => $user->getId(),
'notifTime' => $this->getNotificationTimestamp( $user, $title, $item, $force, $oldid ),
- 'curTime' => time()
+ 'curTime' => $time
]
);
+ // Try to enqueue this post-send
+ $this->queueGroup->lazyPush( $job );
- // Try to run this post-send
- // Calls DeferredUpdates::addCallableUpdate in normal operation
- call_user_func(
- $this->deferredUpdatesAddCallableUpdateCallback,
- function () use ( $job ) {
- $job->run();
+ $this->uncache( $user, $title );
+
+ return true;
+ }
+
+ /**
+ * @param User $user
+ * @return MapCacheLRU|null The map contains prefixed title keys and TS_MW values
+ */
+ private function getPageSeenTimestamps( User $user ) {
+ $key = $this->getPageSeenTimestampsKey( $user );
+
+ return $this->latestUpdateCache->getWithSetCallback(
+ $key,
+ IExpiringStore::TTL_PROC_LONG,
+ function () use ( $key ) {
+ return $this->stash->get( $key ) ?: null;
}
);
+ }
- $this->uncache( $user, $title );
+ /**
+ * @param User $user
+ * @return string
+ */
+ private function getPageSeenTimestampsKey( User $user ) {
+ return $this->stash->makeGlobalKey(
+ 'watchlist-recent-updates',
+ $this->lbFactory->getLocalDomainID(),
+ $user->getId()
+ );
+ }
- return true;
+ /**
+ * @param LinkTarget $target
+ * @return string
+ */
+ private function getPageSeenKey( LinkTarget $target ) {
+ return "{$target->getNamespace()}:{$target->getDBkey()}";
}
private function getNotificationTimestamp( User $user, Title $title, $item, $force, $oldid ) {
* @return int|bool
*/
public function countUnreadNotifications( User $user, $unreadLimit = null ) {
+ $dbr = $this->getConnectionRef( DB_REPLICA );
+
$queryOptions = [];
if ( $unreadLimit !== null ) {
$unreadLimit = (int)$unreadLimit;
$queryOptions['LIMIT'] = $unreadLimit;
}
- $dbr = $this->getConnectionRef( DB_REPLICA );
- $rowCount = $dbr->selectRowCount(
- 'watchlist',
- '1',
- [
- 'wl_user' => $user->getId(),
- 'wl_notificationtimestamp IS NOT NULL',
- ],
- __METHOD__,
- $queryOptions
- );
+ $conds = [
+ 'wl_user' => $user->getId(),
+ 'wl_notificationtimestamp IS NOT NULL'
+ ];
+
+ $rowCount = $dbr->selectRowCount( 'watchlist', '1', $conds, __METHOD__, $queryOptions );
- if ( !isset( $unreadLimit ) ) {
+ if ( $unreadLimit === null ) {
return $rowCount;
}