Use commit-and-wait when processing more than updateRowsPerQuery
[lhc/web/wiklou.git] / includes / watcheditem / WatchedItemStore.php
1 <?php
2
3 use Wikimedia\Rdbms\IDatabase;
4 use Liuggio\StatsdClient\Factory\StatsdDataFactoryInterface;
5 use MediaWiki\Linker\LinkTarget;
6 use Wikimedia\Assert\Assert;
7 use Wikimedia\Rdbms\LBFactory;
8 use Wikimedia\ScopedCallback;
9 use Wikimedia\Rdbms\ILBFactory;
10 use Wikimedia\Rdbms\LoadBalancer;
11
12 /**
13 * Storage layer class for WatchedItems.
14 * Database interaction & caching
15 * TODO caching should be factored out into a CachingWatchedItemStore class
16 *
17 * @author Addshore
18 * @since 1.27
19 */
20 class WatchedItemStore implements WatchedItemStoreInterface, StatsdAwareInterface {
21
22 /**
23 * @var ILBFactory
24 */
25 private $lbFactory;
26
27 /**
28 * @var LoadBalancer
29 */
30 private $loadBalancer;
31
32 /**
33 * @var ReadOnlyMode
34 */
35 private $readOnlyMode;
36
37 /**
38 * @var HashBagOStuff
39 */
40 private $cache;
41
42 /**
43 * @var array[] Looks like $cacheIndex[Namespace ID][Target DB Key][User Id] => 'key'
44 * The index is needed so that on mass changes all relevant items can be un-cached.
45 * For example: Clearing a users watchlist of all items or updating notification timestamps
46 * for all users watching a single target.
47 */
48 private $cacheIndex = [];
49
50 /**
51 * @var callable|null
52 */
53 private $deferredUpdatesAddCallableUpdateCallback;
54
55 /**
56 * @var callable|null
57 */
58 private $revisionGetTimestampFromIdCallback;
59
60 /**
61 * @var int
62 */
63 private $updateRowsPerQuery;
64
65 /**
66 * @var StatsdDataFactoryInterface
67 */
68 private $stats;
69
70 /**
71 * @param ILBFactory $lbFactory
72 * @param HashBagOStuff $cache
73 * @param ReadOnlyMode $readOnlyMode
74 * @param int $updateRowsPerQuery
75 */
76 public function __construct(
77 ILBFactory $lbFactory,
78 HashBagOStuff $cache,
79 ReadOnlyMode $readOnlyMode,
80 $updateRowsPerQuery
81 ) {
82 $this->lbFactory = $lbFactory;
83 $this->loadBalancer = $lbFactory->getMainLB();
84 $this->cache = $cache;
85 $this->readOnlyMode = $readOnlyMode;
86 $this->stats = new NullStatsdDataFactory();
87 $this->deferredUpdatesAddCallableUpdateCallback =
88 [ DeferredUpdates::class, 'addCallableUpdate' ];
89 $this->revisionGetTimestampFromIdCallback =
90 [ Revision::class, 'getTimestampFromId' ];
91 $this->updateRowsPerQuery = $updateRowsPerQuery;
92 }
93
94 /**
95 * @param StatsdDataFactoryInterface $stats
96 */
97 public function setStatsdDataFactory( StatsdDataFactoryInterface $stats ) {
98 $this->stats = $stats;
99 }
100
101 /**
102 * Overrides the DeferredUpdates::addCallableUpdate callback
103 * This is intended for use while testing and will fail if MW_PHPUNIT_TEST is not defined.
104 *
105 * @param callable $callback
106 *
107 * @see DeferredUpdates::addCallableUpdate for callback signiture
108 *
109 * @return ScopedCallback to reset the overridden value
110 * @throws MWException
111 */
112 public function overrideDeferredUpdatesAddCallableUpdateCallback( callable $callback ) {
113 if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
114 throw new MWException(
115 'Cannot override DeferredUpdates::addCallableUpdate callback in operation.'
116 );
117 }
118 $previousValue = $this->deferredUpdatesAddCallableUpdateCallback;
119 $this->deferredUpdatesAddCallableUpdateCallback = $callback;
120 return new ScopedCallback( function () use ( $previousValue ) {
121 $this->deferredUpdatesAddCallableUpdateCallback = $previousValue;
122 } );
123 }
124
125 /**
126 * Overrides the Revision::getTimestampFromId callback
127 * This is intended for use while testing and will fail if MW_PHPUNIT_TEST is not defined.
128 *
129 * @param callable $callback
130 * @see Revision::getTimestampFromId for callback signiture
131 *
132 * @return ScopedCallback to reset the overridden value
133 * @throws MWException
134 */
135 public function overrideRevisionGetTimestampFromIdCallback( callable $callback ) {
136 if ( !defined( 'MW_PHPUNIT_TEST' ) ) {
137 throw new MWException(
138 'Cannot override Revision::getTimestampFromId callback in operation.'
139 );
140 }
141 $previousValue = $this->revisionGetTimestampFromIdCallback;
142 $this->revisionGetTimestampFromIdCallback = $callback;
143 return new ScopedCallback( function () use ( $previousValue ) {
144 $this->revisionGetTimestampFromIdCallback = $previousValue;
145 } );
146 }
147
148 private function getCacheKey( User $user, LinkTarget $target ) {
149 return $this->cache->makeKey(
150 (string)$target->getNamespace(),
151 $target->getDBkey(),
152 (string)$user->getId()
153 );
154 }
155
156 private function cache( WatchedItem $item ) {
157 $user = $item->getUser();
158 $target = $item->getLinkTarget();
159 $key = $this->getCacheKey( $user, $target );
160 $this->cache->set( $key, $item );
161 $this->cacheIndex[$target->getNamespace()][$target->getDBkey()][$user->getId()] = $key;
162 $this->stats->increment( 'WatchedItemStore.cache' );
163 }
164
165 private function uncache( User $user, LinkTarget $target ) {
166 $this->cache->delete( $this->getCacheKey( $user, $target ) );
167 unset( $this->cacheIndex[$target->getNamespace()][$target->getDBkey()][$user->getId()] );
168 $this->stats->increment( 'WatchedItemStore.uncache' );
169 }
170
171 private function uncacheLinkTarget( LinkTarget $target ) {
172 $this->stats->increment( 'WatchedItemStore.uncacheLinkTarget' );
173 if ( !isset( $this->cacheIndex[$target->getNamespace()][$target->getDBkey()] ) ) {
174 return;
175 }
176 foreach ( $this->cacheIndex[$target->getNamespace()][$target->getDBkey()] as $key ) {
177 $this->stats->increment( 'WatchedItemStore.uncacheLinkTarget.items' );
178 $this->cache->delete( $key );
179 }
180 }
181
182 private function uncacheUser( User $user ) {
183 $this->stats->increment( 'WatchedItemStore.uncacheUser' );
184 foreach ( $this->cacheIndex as $ns => $dbKeyArray ) {
185 foreach ( $dbKeyArray as $dbKey => $userArray ) {
186 if ( isset( $userArray[$user->getId()] ) ) {
187 $this->stats->increment( 'WatchedItemStore.uncacheUser.items' );
188 $this->cache->delete( $userArray[$user->getId()] );
189 }
190 }
191 }
192 }
193
194 /**
195 * @param User $user
196 * @param LinkTarget $target
197 *
198 * @return WatchedItem|false
199 */
200 private function getCached( User $user, LinkTarget $target ) {
201 return $this->cache->get( $this->getCacheKey( $user, $target ) );
202 }
203
204 /**
205 * Return an array of conditions to select or update the appropriate database
206 * row.
207 *
208 * @param User $user
209 * @param LinkTarget $target
210 *
211 * @return array
212 */
213 private function dbCond( User $user, LinkTarget $target ) {
214 return [
215 'wl_user' => $user->getId(),
216 'wl_namespace' => $target->getNamespace(),
217 'wl_title' => $target->getDBkey(),
218 ];
219 }
220
221 /**
222 * @param int $dbIndex DB_MASTER or DB_REPLICA
223 *
224 * @return IDatabase
225 * @throws MWException
226 */
227 private function getConnectionRef( $dbIndex ) {
228 return $this->loadBalancer->getConnectionRef( $dbIndex, [ 'watchlist' ] );
229 }
230
231 /**
232 * Deletes ALL watched items for the given user when under
233 * $updateRowsPerQuery entries exist.
234 *
235 * @since 1.30
236 *
237 * @param User $user
238 *
239 * @return bool true on success, false when too many items are watched
240 */
241 public function clearUserWatchedItems( User $user ) {
242 if ( $this->countWatchedItems( $user ) > $this->updateRowsPerQuery ) {
243 return false;
244 }
245
246 $dbw = $this->loadBalancer->getConnectionRef( DB_MASTER );
247 $dbw->delete(
248 'watchlist',
249 [ 'wl_user' => $user->getId() ],
250 __METHOD__
251 );
252 $this->uncacheAllItemsForUser( $user );
253
254 return true;
255 }
256
257 private function uncacheAllItemsForUser( User $user ) {
258 $userId = $user->getId();
259 foreach ( $this->cacheIndex as $ns => $dbKeyIndex ) {
260 foreach ( $dbKeyIndex as $dbKey => $userIndex ) {
261 if ( array_key_exists( $userId, $userIndex ) ) {
262 $this->cache->delete( $userIndex[$userId] );
263 unset( $this->cacheIndex[$ns][$dbKey][$userId] );
264 }
265 }
266 }
267
268 // Cleanup empty cache keys
269 foreach ( $this->cacheIndex as $ns => $dbKeyIndex ) {
270 foreach ( $dbKeyIndex as $dbKey => $userIndex ) {
271 if ( empty( $this->cacheIndex[$ns][$dbKey] ) ) {
272 unset( $this->cacheIndex[$ns][$dbKey] );
273 }
274 }
275 if ( empty( $this->cacheIndex[$ns] ) ) {
276 unset( $this->cacheIndex[$ns] );
277 }
278 }
279 }
280
281 /**
282 * Queues a job that will clear the users watchlist using the Job Queue.
283 *
284 * @since 1.31
285 *
286 * @param User $user
287 */
288 public function clearUserWatchedItemsUsingJobQueue( User $user ) {
289 $job = ClearUserWatchlistJob::newForUser( $user, $this->getMaxId() );
290 // TODO inject me.
291 JobQueueGroup::singleton()->push( $job );
292 }
293
294 /**
295 * @since 1.31
296 * @return int The maximum current wl_id
297 */
298 public function getMaxId() {
299 $dbr = $this->getConnectionRef( DB_REPLICA );
300 return (int)$dbr->selectField(
301 'watchlist',
302 'MAX(wl_id)',
303 '',
304 __METHOD__
305 );
306 }
307
308 /**
309 * @since 1.31
310 * @param User $user
311 * @return int
312 */
313 public function countWatchedItems( User $user ) {
314 $dbr = $this->getConnectionRef( DB_REPLICA );
315 $return = (int)$dbr->selectField(
316 'watchlist',
317 'COUNT(*)',
318 [
319 'wl_user' => $user->getId()
320 ],
321 __METHOD__
322 );
323
324 return $return;
325 }
326
327 /**
328 * @since 1.27
329 * @param LinkTarget $target
330 * @return int
331 */
332 public function countWatchers( LinkTarget $target ) {
333 $dbr = $this->getConnectionRef( DB_REPLICA );
334 $return = (int)$dbr->selectField(
335 'watchlist',
336 'COUNT(*)',
337 [
338 'wl_namespace' => $target->getNamespace(),
339 'wl_title' => $target->getDBkey(),
340 ],
341 __METHOD__
342 );
343
344 return $return;
345 }
346
347 /**
348 * @since 1.27
349 * @param LinkTarget $target
350 * @param string|int $threshold
351 * @return int
352 */
353 public function countVisitingWatchers( LinkTarget $target, $threshold ) {
354 $dbr = $this->getConnectionRef( DB_REPLICA );
355 $visitingWatchers = (int)$dbr->selectField(
356 'watchlist',
357 'COUNT(*)',
358 [
359 'wl_namespace' => $target->getNamespace(),
360 'wl_title' => $target->getDBkey(),
361 'wl_notificationtimestamp >= ' .
362 $dbr->addQuotes( $dbr->timestamp( $threshold ) ) .
363 ' OR wl_notificationtimestamp IS NULL'
364 ],
365 __METHOD__
366 );
367
368 return $visitingWatchers;
369 }
370
371 /**
372 * @param User $user
373 * @param TitleValue[] $titles
374 * @return bool
375 * @throws MWException
376 */
377 public function removeWatchBatchForUser( User $user, array $titles ) {
378 if ( $this->readOnlyMode->isReadOnly() ) {
379 return false;
380 }
381 if ( $user->isAnon() ) {
382 return false;
383 }
384 if ( !$titles ) {
385 return true;
386 }
387
388 $rows = $this->getTitleDbKeysGroupedByNamespace( $titles );
389 $this->uncacheTitlesForUser( $user, $titles );
390
391 $dbw = $this->getConnectionRef( DB_MASTER );
392 $ticket = count( $titles ) > $this->updateRowsPerQuery ?
393 $this->lbFactory->getEmptyTransactionTicket( __METHOD__ ) : null;
394 $affectedRows = 0;
395
396 // Batch delete items per namespace.
397 foreach ( $rows as $namespace => $namespaceTitles ) {
398 $rowBatches = array_chunk( $namespaceTitles, $this->updateRowsPerQuery );
399 foreach ( $rowBatches as $toDelete ) {
400 $dbw->delete( 'watchlist', [
401 'wl_user' => $user->getId(),
402 'wl_namespace' => $namespace,
403 'wl_title' => $toDelete
404 ], __METHOD__ );
405 $affectedRows += $dbw->affectedRows();
406 if ( $ticket ) {
407 $this->lbFactory->commitAndWaitForReplication( __METHOD__, $ticket );
408 }
409 }
410 }
411
412 return (bool)$affectedRows;
413 }
414
415 /**
416 * @since 1.27
417 * @param LinkTarget[] $targets
418 * @param array $options
419 * @return array
420 */
421 public function countWatchersMultiple( array $targets, array $options = [] ) {
422 $dbOptions = [ 'GROUP BY' => [ 'wl_namespace', 'wl_title' ] ];
423
424 $dbr = $this->getConnectionRef( DB_REPLICA );
425
426 if ( array_key_exists( 'minimumWatchers', $options ) ) {
427 $dbOptions['HAVING'] = 'COUNT(*) >= ' . (int)$options['minimumWatchers'];
428 }
429
430 $lb = new LinkBatch( $targets );
431 $res = $dbr->select(
432 'watchlist',
433 [ 'wl_title', 'wl_namespace', 'watchers' => 'COUNT(*)' ],
434 [ $lb->constructSet( 'wl', $dbr ) ],
435 __METHOD__,
436 $dbOptions
437 );
438
439 $watchCounts = [];
440 foreach ( $targets as $linkTarget ) {
441 $watchCounts[$linkTarget->getNamespace()][$linkTarget->getDBkey()] = 0;
442 }
443
444 foreach ( $res as $row ) {
445 $watchCounts[$row->wl_namespace][$row->wl_title] = (int)$row->watchers;
446 }
447
448 return $watchCounts;
449 }
450
451 /**
452 * @since 1.27
453 * @param array $targetsWithVisitThresholds
454 * @param int|null $minimumWatchers
455 * @return array
456 */
457 public function countVisitingWatchersMultiple(
458 array $targetsWithVisitThresholds,
459 $minimumWatchers = null
460 ) {
461 if ( $targetsWithVisitThresholds === [] ) {
462 // No titles requested => no results returned
463 return [];
464 }
465
466 $dbr = $this->getConnectionRef( DB_REPLICA );
467
468 $conds = $this->getVisitingWatchersCondition( $dbr, $targetsWithVisitThresholds );
469
470 $dbOptions = [ 'GROUP BY' => [ 'wl_namespace', 'wl_title' ] ];
471 if ( $minimumWatchers !== null ) {
472 $dbOptions['HAVING'] = 'COUNT(*) >= ' . (int)$minimumWatchers;
473 }
474 $res = $dbr->select(
475 'watchlist',
476 [ 'wl_namespace', 'wl_title', 'watchers' => 'COUNT(*)' ],
477 $conds,
478 __METHOD__,
479 $dbOptions
480 );
481
482 $watcherCounts = [];
483 foreach ( $targetsWithVisitThresholds as list( $target ) ) {
484 /* @var LinkTarget $target */
485 $watcherCounts[$target->getNamespace()][$target->getDBkey()] = 0;
486 }
487
488 foreach ( $res as $row ) {
489 $watcherCounts[$row->wl_namespace][$row->wl_title] = (int)$row->watchers;
490 }
491
492 return $watcherCounts;
493 }
494
495 /**
496 * Generates condition for the query used in a batch count visiting watchers.
497 *
498 * @param IDatabase $db
499 * @param array $targetsWithVisitThresholds array of pairs (LinkTarget, last visit threshold)
500 * @return string
501 */
502 private function getVisitingWatchersCondition(
503 IDatabase $db,
504 array $targetsWithVisitThresholds
505 ) {
506 $missingTargets = [];
507 $namespaceConds = [];
508 foreach ( $targetsWithVisitThresholds as list( $target, $threshold ) ) {
509 if ( $threshold === null ) {
510 $missingTargets[] = $target;
511 continue;
512 }
513 /* @var LinkTarget $target */
514 $namespaceConds[$target->getNamespace()][] = $db->makeList( [
515 'wl_title = ' . $db->addQuotes( $target->getDBkey() ),
516 $db->makeList( [
517 'wl_notificationtimestamp >= ' . $db->addQuotes( $db->timestamp( $threshold ) ),
518 'wl_notificationtimestamp IS NULL'
519 ], LIST_OR )
520 ], LIST_AND );
521 }
522
523 $conds = [];
524 foreach ( $namespaceConds as $namespace => $pageConds ) {
525 $conds[] = $db->makeList( [
526 'wl_namespace = ' . $namespace,
527 '(' . $db->makeList( $pageConds, LIST_OR ) . ')'
528 ], LIST_AND );
529 }
530
531 if ( $missingTargets ) {
532 $lb = new LinkBatch( $missingTargets );
533 $conds[] = $lb->constructSet( 'wl', $db );
534 }
535
536 return $db->makeList( $conds, LIST_OR );
537 }
538
539 /**
540 * @since 1.27
541 * @param User $user
542 * @param LinkTarget $target
543 * @return bool
544 */
545 public function getWatchedItem( User $user, LinkTarget $target ) {
546 if ( $user->isAnon() ) {
547 return false;
548 }
549
550 $cached = $this->getCached( $user, $target );
551 if ( $cached ) {
552 $this->stats->increment( 'WatchedItemStore.getWatchedItem.cached' );
553 return $cached;
554 }
555 $this->stats->increment( 'WatchedItemStore.getWatchedItem.load' );
556 return $this->loadWatchedItem( $user, $target );
557 }
558
559 /**
560 * @since 1.27
561 * @param User $user
562 * @param LinkTarget $target
563 * @return WatchedItem|bool
564 */
565 public function loadWatchedItem( User $user, LinkTarget $target ) {
566 // Only loggedin user can have a watchlist
567 if ( $user->isAnon() ) {
568 return false;
569 }
570
571 $dbr = $this->getConnectionRef( DB_REPLICA );
572 $row = $dbr->selectRow(
573 'watchlist',
574 'wl_notificationtimestamp',
575 $this->dbCond( $user, $target ),
576 __METHOD__
577 );
578
579 if ( !$row ) {
580 return false;
581 }
582
583 $item = new WatchedItem(
584 $user,
585 $target,
586 wfTimestampOrNull( TS_MW, $row->wl_notificationtimestamp )
587 );
588 $this->cache( $item );
589
590 return $item;
591 }
592
593 /**
594 * @since 1.27
595 * @param User $user
596 * @param array $options
597 * @return WatchedItem[]
598 */
599 public function getWatchedItemsForUser( User $user, array $options = [] ) {
600 $options += [ 'forWrite' => false ];
601
602 $dbOptions = [];
603 if ( array_key_exists( 'sort', $options ) ) {
604 Assert::parameter(
605 ( in_array( $options['sort'], [ self::SORT_ASC, self::SORT_DESC ] ) ),
606 '$options[\'sort\']',
607 'must be SORT_ASC or SORT_DESC'
608 );
609 $dbOptions['ORDER BY'] = [
610 "wl_namespace {$options['sort']}",
611 "wl_title {$options['sort']}"
612 ];
613 }
614 $db = $this->getConnectionRef( $options['forWrite'] ? DB_MASTER : DB_REPLICA );
615
616 $res = $db->select(
617 'watchlist',
618 [ 'wl_namespace', 'wl_title', 'wl_notificationtimestamp' ],
619 [ 'wl_user' => $user->getId() ],
620 __METHOD__,
621 $dbOptions
622 );
623
624 $watchedItems = [];
625 foreach ( $res as $row ) {
626 // @todo: Should we add these to the process cache?
627 $watchedItems[] = new WatchedItem(
628 $user,
629 new TitleValue( (int)$row->wl_namespace, $row->wl_title ),
630 $row->wl_notificationtimestamp
631 );
632 }
633
634 return $watchedItems;
635 }
636
637 /**
638 * @since 1.27
639 * @param User $user
640 * @param LinkTarget $target
641 * @return bool
642 */
643 public function isWatched( User $user, LinkTarget $target ) {
644 return (bool)$this->getWatchedItem( $user, $target );
645 }
646
647 /**
648 * @since 1.27
649 * @param User $user
650 * @param LinkTarget[] $targets
651 * @return array
652 */
653 public function getNotificationTimestampsBatch( User $user, array $targets ) {
654 $timestamps = [];
655 foreach ( $targets as $target ) {
656 $timestamps[$target->getNamespace()][$target->getDBkey()] = false;
657 }
658
659 if ( $user->isAnon() ) {
660 return $timestamps;
661 }
662
663 $targetsToLoad = [];
664 foreach ( $targets as $target ) {
665 $cachedItem = $this->getCached( $user, $target );
666 if ( $cachedItem ) {
667 $timestamps[$target->getNamespace()][$target->getDBkey()] =
668 $cachedItem->getNotificationTimestamp();
669 } else {
670 $targetsToLoad[] = $target;
671 }
672 }
673
674 if ( !$targetsToLoad ) {
675 return $timestamps;
676 }
677
678 $dbr = $this->getConnectionRef( DB_REPLICA );
679
680 $lb = new LinkBatch( $targetsToLoad );
681 $res = $dbr->select(
682 'watchlist',
683 [ 'wl_namespace', 'wl_title', 'wl_notificationtimestamp' ],
684 [
685 $lb->constructSet( 'wl', $dbr ),
686 'wl_user' => $user->getId(),
687 ],
688 __METHOD__
689 );
690
691 foreach ( $res as $row ) {
692 $timestamps[$row->wl_namespace][$row->wl_title] =
693 wfTimestampOrNull( TS_MW, $row->wl_notificationtimestamp );
694 }
695
696 return $timestamps;
697 }
698
699 /**
700 * @since 1.27
701 * @param User $user
702 * @param LinkTarget $target
703 * @throws MWException
704 */
705 public function addWatch( User $user, LinkTarget $target ) {
706 $this->addWatchBatchForUser( $user, [ $target ] );
707 }
708
709 /**
710 * @since 1.27
711 * @param User $user
712 * @param LinkTarget[] $targets
713 * @return bool
714 * @throws MWException
715 */
716 public function addWatchBatchForUser( User $user, array $targets ) {
717 if ( $this->readOnlyMode->isReadOnly() ) {
718 return false;
719 }
720 // Only logged-in user can have a watchlist
721 if ( $user->isAnon() ) {
722 return false;
723 }
724
725 if ( !$targets ) {
726 return true;
727 }
728
729 $rows = [];
730 $items = [];
731 foreach ( $targets as $target ) {
732 $rows[] = [
733 'wl_user' => $user->getId(),
734 'wl_namespace' => $target->getNamespace(),
735 'wl_title' => $target->getDBkey(),
736 'wl_notificationtimestamp' => null,
737 ];
738 $items[] = new WatchedItem(
739 $user,
740 $target,
741 null
742 );
743 $this->uncache( $user, $target );
744 }
745
746 $dbw = $this->getConnectionRef( DB_MASTER );
747 $ticket = count( $targets ) > $this->updateRowsPerQuery ?
748 $this->lbFactory->getEmptyTransactionTicket( __METHOD__ ) : null;
749 $affectedRows = 0;
750 $rowBatches = array_chunk( $rows, $this->updateRowsPerQuery );
751 foreach ( $rowBatches as $toInsert ) {
752 // Use INSERT IGNORE to avoid overwriting the notification timestamp
753 // if there's already an entry for this page
754 $dbw->insert( 'watchlist', $toInsert, __METHOD__, 'IGNORE' );
755 $affectedRows += $dbw->affectedRows();
756 if ( $ticket ) {
757 $this->lbFactory->commitAndWaitForReplication( __METHOD__, $ticket );
758 }
759 }
760 // Update process cache to ensure skin doesn't claim that the current
761 // page is unwatched in the response of action=watch itself (T28292).
762 // This would otherwise be re-queried from a replica by isWatched().
763 foreach ( $items as $item ) {
764 $this->cache( $item );
765 }
766
767 return (bool)$affectedRows;
768 }
769
770 /**
771 * @since 1.27
772 * @param User $user
773 * @param LinkTarget $target
774 * @return bool
775 * @throws MWException
776 */
777 public function removeWatch( User $user, LinkTarget $target ) {
778 return $this->removeWatchBatchForUser( $user, [ $target ] );
779 }
780
781 /**
782 * @since 1.27
783 * @param User $user
784 * @param string|int $timestamp
785 * @param LinkTarget[] $targets
786 * @return bool
787 */
788 public function setNotificationTimestampsForUser( User $user, $timestamp, array $targets = [] ) {
789 // Only loggedin user can have a watchlist
790 if ( $user->isAnon() ) {
791 return false;
792 }
793
794 $dbw = $this->getConnectionRef( DB_MASTER );
795
796 $conds = [ 'wl_user' => $user->getId() ];
797 if ( $targets ) {
798 $batch = new LinkBatch( $targets );
799 $conds[] = $batch->constructSet( 'wl', $dbw );
800 }
801
802 if ( $timestamp !== null ) {
803 $timestamp = $dbw->timestamp( $timestamp );
804 }
805
806 $success = $dbw->update(
807 'watchlist',
808 [ 'wl_notificationtimestamp' => $timestamp ],
809 $conds,
810 __METHOD__
811 );
812
813 $this->uncacheUser( $user );
814
815 return $success;
816 }
817
818 public function resetAllNotificationTimestampsForUser( User $user ) {
819 // Only loggedin user can have a watchlist
820 if ( $user->isAnon() ) {
821 return;
822 }
823
824 // If the page is watched by the user (or may be watched), update the timestamp
825 $job = new ClearWatchlistNotificationsJob(
826 $user->getUserPage(),
827 [ 'userId' => $user->getId(), 'casTime' => time() ]
828 );
829
830 // Try to run this post-send
831 // Calls DeferredUpdates::addCallableUpdate in normal operation
832 call_user_func(
833 $this->deferredUpdatesAddCallableUpdateCallback,
834 function () use ( $job ) {
835 $job->run();
836 }
837 );
838 }
839
840 /**
841 * @since 1.27
842 * @param User $editor
843 * @param LinkTarget $target
844 * @param string|int $timestamp
845 * @return int[]
846 */
847 public function updateNotificationTimestamp( User $editor, LinkTarget $target, $timestamp ) {
848 $dbw = $this->getConnectionRef( DB_MASTER );
849 $uids = $dbw->selectFieldValues(
850 'watchlist',
851 'wl_user',
852 [
853 'wl_user != ' . intval( $editor->getId() ),
854 'wl_namespace' => $target->getNamespace(),
855 'wl_title' => $target->getDBkey(),
856 'wl_notificationtimestamp IS NULL',
857 ],
858 __METHOD__
859 );
860
861 $watchers = array_map( 'intval', $uids );
862 if ( $watchers ) {
863 // Update wl_notificationtimestamp for all watching users except the editor
864 $fname = __METHOD__;
865 DeferredUpdates::addCallableUpdate(
866 function () use ( $timestamp, $watchers, $target, $fname ) {
867 $dbw = $this->getConnectionRef( DB_MASTER );
868 $ticket = $this->lbFactory->getEmptyTransactionTicket( $fname );
869
870 $watchersChunks = array_chunk( $watchers, $this->updateRowsPerQuery );
871 foreach ( $watchersChunks as $watchersChunk ) {
872 $dbw->update( 'watchlist',
873 [ /* SET */
874 'wl_notificationtimestamp' => $dbw->timestamp( $timestamp )
875 ], [ /* WHERE - TODO Use wl_id T130067 */
876 'wl_user' => $watchersChunk,
877 'wl_namespace' => $target->getNamespace(),
878 'wl_title' => $target->getDBkey(),
879 ], $fname
880 );
881 if ( count( $watchersChunks ) > 1 ) {
882 $this->lbFactory->commitAndWaitForReplication(
883 $fname, $ticket, [ 'domain' => $dbw->getDomainID() ]
884 );
885 }
886 }
887 $this->uncacheLinkTarget( $target );
888 },
889 DeferredUpdates::POSTSEND,
890 $dbw
891 );
892 }
893
894 return $watchers;
895 }
896
897 /**
898 * @since 1.27
899 * @param User $user
900 * @param Title $title
901 * @param string $force
902 * @param int $oldid
903 * @return bool
904 */
905 public function resetNotificationTimestamp( User $user, Title $title, $force = '', $oldid = 0 ) {
906 // Only loggedin user can have a watchlist
907 if ( $this->readOnlyMode->isReadOnly() || $user->isAnon() ) {
908 return false;
909 }
910
911 $item = null;
912 if ( $force != 'force' ) {
913 $item = $this->loadWatchedItem( $user, $title );
914 if ( !$item || $item->getNotificationTimestamp() === null ) {
915 return false;
916 }
917 }
918
919 // If the page is watched by the user (or may be watched), update the timestamp
920 $job = new ActivityUpdateJob(
921 $title,
922 [
923 'type' => 'updateWatchlistNotification',
924 'userid' => $user->getId(),
925 'notifTime' => $this->getNotificationTimestamp( $user, $title, $item, $force, $oldid ),
926 'curTime' => time()
927 ]
928 );
929
930 // Try to run this post-send
931 // Calls DeferredUpdates::addCallableUpdate in normal operation
932 call_user_func(
933 $this->deferredUpdatesAddCallableUpdateCallback,
934 function () use ( $job ) {
935 $job->run();
936 }
937 );
938
939 $this->uncache( $user, $title );
940
941 return true;
942 }
943
944 private function getNotificationTimestamp( User $user, Title $title, $item, $force, $oldid ) {
945 if ( !$oldid ) {
946 // No oldid given, assuming latest revision; clear the timestamp.
947 return null;
948 }
949
950 if ( !$title->getNextRevisionID( $oldid ) ) {
951 // Oldid given and is the latest revision for this title; clear the timestamp.
952 return null;
953 }
954
955 if ( $item === null ) {
956 $item = $this->loadWatchedItem( $user, $title );
957 }
958
959 if ( !$item ) {
960 // This can only happen if $force is enabled.
961 return null;
962 }
963
964 // Oldid given and isn't the latest; update the timestamp.
965 // This will result in no further notification emails being sent!
966 // Calls Revision::getTimestampFromId in normal operation
967 $notificationTimestamp = call_user_func(
968 $this->revisionGetTimestampFromIdCallback,
969 $title,
970 $oldid
971 );
972
973 // We need to go one second to the future because of various strict comparisons
974 // throughout the codebase
975 $ts = new MWTimestamp( $notificationTimestamp );
976 $ts->timestamp->add( new DateInterval( 'PT1S' ) );
977 $notificationTimestamp = $ts->getTimestamp( TS_MW );
978
979 if ( $notificationTimestamp < $item->getNotificationTimestamp() ) {
980 if ( $force != 'force' ) {
981 return false;
982 } else {
983 // This is a little silly…
984 return $item->getNotificationTimestamp();
985 }
986 }
987
988 return $notificationTimestamp;
989 }
990
991 /**
992 * @since 1.27
993 * @param User $user
994 * @param int|null $unreadLimit
995 * @return int|bool
996 */
997 public function countUnreadNotifications( User $user, $unreadLimit = null ) {
998 $queryOptions = [];
999 if ( $unreadLimit !== null ) {
1000 $unreadLimit = (int)$unreadLimit;
1001 $queryOptions['LIMIT'] = $unreadLimit;
1002 }
1003
1004 $dbr = $this->getConnectionRef( DB_REPLICA );
1005 $rowCount = $dbr->selectRowCount(
1006 'watchlist',
1007 '1',
1008 [
1009 'wl_user' => $user->getId(),
1010 'wl_notificationtimestamp IS NOT NULL',
1011 ],
1012 __METHOD__,
1013 $queryOptions
1014 );
1015
1016 if ( !isset( $unreadLimit ) ) {
1017 return $rowCount;
1018 }
1019
1020 if ( $rowCount >= $unreadLimit ) {
1021 return true;
1022 }
1023
1024 return $rowCount;
1025 }
1026
1027 /**
1028 * @since 1.27
1029 * @param LinkTarget $oldTarget
1030 * @param LinkTarget $newTarget
1031 */
1032 public function duplicateAllAssociatedEntries( LinkTarget $oldTarget, LinkTarget $newTarget ) {
1033 $oldTarget = Title::newFromLinkTarget( $oldTarget );
1034 $newTarget = Title::newFromLinkTarget( $newTarget );
1035
1036 $this->duplicateEntry( $oldTarget->getSubjectPage(), $newTarget->getSubjectPage() );
1037 $this->duplicateEntry( $oldTarget->getTalkPage(), $newTarget->getTalkPage() );
1038 }
1039
1040 /**
1041 * @since 1.27
1042 * @param LinkTarget $oldTarget
1043 * @param LinkTarget $newTarget
1044 */
1045 public function duplicateEntry( LinkTarget $oldTarget, LinkTarget $newTarget ) {
1046 $dbw = $this->getConnectionRef( DB_MASTER );
1047
1048 $result = $dbw->select(
1049 'watchlist',
1050 [ 'wl_user', 'wl_notificationtimestamp' ],
1051 [
1052 'wl_namespace' => $oldTarget->getNamespace(),
1053 'wl_title' => $oldTarget->getDBkey(),
1054 ],
1055 __METHOD__,
1056 [ 'FOR UPDATE' ]
1057 );
1058
1059 $newNamespace = $newTarget->getNamespace();
1060 $newDBkey = $newTarget->getDBkey();
1061
1062 # Construct array to replace into the watchlist
1063 $values = [];
1064 foreach ( $result as $row ) {
1065 $values[] = [
1066 'wl_user' => $row->wl_user,
1067 'wl_namespace' => $newNamespace,
1068 'wl_title' => $newDBkey,
1069 'wl_notificationtimestamp' => $row->wl_notificationtimestamp,
1070 ];
1071 }
1072
1073 if ( !empty( $values ) ) {
1074 # Perform replace
1075 # Note that multi-row replace is very efficient for MySQL but may be inefficient for
1076 # some other DBMSes, mostly due to poor simulation by us
1077 $dbw->replace(
1078 'watchlist',
1079 [ [ 'wl_user', 'wl_namespace', 'wl_title' ] ],
1080 $values,
1081 __METHOD__
1082 );
1083 }
1084 }
1085
1086 /**
1087 * @param TitleValue[] $titles
1088 * @return array
1089 */
1090 private function getTitleDbKeysGroupedByNamespace( array $titles ) {
1091 $rows = [];
1092 foreach ( $titles as $title ) {
1093 // Group titles by namespace.
1094 $rows[ $title->getNamespace() ][] = $title->getDBkey();
1095 }
1096 return $rows;
1097 }
1098
1099 /**
1100 * @param User $user
1101 * @param Title[] $titles
1102 */
1103 private function uncacheTitlesForUser( User $user, array $titles ) {
1104 foreach ( $titles as $title ) {
1105 $this->uncache( $user, $title );
1106 }
1107 }
1108
1109 }