Add WRITE_SYNC flag to BagOStuff::set()/merge()
[lhc/web/wiklou.git] / includes / objectcache / MultiWriteBagOStuff.php
1 <?php
2 /**
3 * Wrapper for object caching in different caches.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Cache
22 */
23
24 /**
25 * A cache class that replicates all writes to multiple child caches. Reads
26 * are implemented by reading from the caches in the order they are given in
27 * the configuration until a cache gives a positive result.
28 *
29 * @ingroup Cache
30 */
31 class MultiWriteBagOStuff extends BagOStuff {
32 /** @var BagOStuff[] */
33 protected $caches;
34 /** @var bool Use async secondary writes */
35 protected $asyncWrites = false;
36 /** @var callback|null */
37 protected $asyncHandler;
38
39 /** Idiom for "write to all backends" */
40 const ALL = INF;
41
42 const UPGRADE_TTL = 3600; // TTL when a key is copied to a higher cache tier
43
44 /**
45 * $params include:
46 * - caches: A numbered array of either ObjectFactory::getObjectFromSpec
47 * arrays yeilding BagOStuff objects or direct BagOStuff objects.
48 * If using the former, the 'args' field *must* be set.
49 * The first cache is the primary one, being the first to
50 * be read in the fallback chain. Writes happen to all stores
51 * in the order they are defined. However, lock()/unlock() calls
52 * only use the primary store.
53 * - replication: Either 'sync' or 'async'. This controls whether writes
54 * to secondary stores are deferred when possible. Async writes
55 * require setting 'asyncCallback'. HHVM register_postsend_function() function.
56 * Async writes can increase the chance of some race conditions
57 * or cause keys to expire seconds later than expected. It is
58 * safe to use for modules when cached values: are immutable,
59 * invalidation uses logical TTLs, invalidation uses etag/timestamp
60 * validation against the DB, or merge() is used to handle races.
61 * - asyncHandler: callable that takes a callback and runs it after the
62 * current web request ends. In CLI mode, it should run it immediately.
63 * @param array $params
64 * @throws InvalidArgumentException
65 */
66 public function __construct( $params ) {
67 parent::__construct( $params );
68
69 if ( empty( $params['caches'] ) || !is_array( $params['caches'] ) ) {
70 throw new InvalidArgumentException(
71 __METHOD__ . ': "caches" parameter must be an array of caches'
72 );
73 }
74
75 $this->caches = array();
76 foreach ( $params['caches'] as $cacheInfo ) {
77 if ( $cacheInfo instanceof BagOStuff ) {
78 $this->caches[] = $cacheInfo;
79 } else {
80 if ( !isset( $cacheInfo['args'] ) ) {
81 // B/C for when $cacheInfo was for ObjectCache::newFromParams().
82 // Callers intenting this to be for ObjectFactory::getObjectFromSpec
83 // should have set "args" per the docs above. Doings so avoids extra
84 // (likely harmless) params (factory/class/calls) ending up in "args".
85 $cacheInfo['args'] = array( $cacheInfo );
86 }
87 $this->caches[] = ObjectFactory::getObjectFromSpec( $cacheInfo );
88 }
89 }
90
91 $this->asyncHandler = isset( $params['asyncHandler'] )
92 ? $params['asyncHandler']
93 : null;
94 $this->asyncWrites = (
95 isset( $params['replication'] ) &&
96 $params['replication'] === 'async' &&
97 is_callable( $this->asyncHandler )
98 );
99 }
100
101 public function setDebug( $debug ) {
102 foreach ( $this->caches as $cache ) {
103 $cache->setDebug( $debug );
104 }
105 }
106
107 protected function doGet( $key, $flags = 0 ) {
108 if ( ( $flags & self::READ_LATEST ) == self::READ_LATEST ) {
109 // If the latest write was a delete(), we do NOT want to fallback
110 // to the other tiers and possibly see the old value. Also, this
111 // is used by mergeViaLock(), which only needs to hit the primary.
112 return $this->caches[0]->get( $key, $flags );
113 }
114
115 $misses = 0; // number backends checked
116 $value = false;
117 foreach ( $this->caches as $cache ) {
118 $value = $cache->get( $key, $flags );
119 if ( $value !== false ) {
120 break;
121 }
122 ++$misses;
123 }
124
125 if ( $value !== false
126 && $misses > 0
127 && ( $flags & self::READ_VERIFIED ) == self::READ_VERIFIED
128 ) {
129 $this->doWrite( $misses, $this->asyncWrites, 'set', $key, $value, self::UPGRADE_TTL );
130 }
131
132 return $value;
133 }
134
135 public function set( $key, $value, $exptime = 0, $flags = 0 ) {
136 $asyncWrites = ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC )
137 ? false
138 : $this->asyncWrites;
139
140 return $this->doWrite( self::ALL, $asyncWrites, 'set', $key, $value, $exptime );
141 }
142
143 public function delete( $key ) {
144 return $this->doWrite( self::ALL, $this->asyncWrites, 'delete', $key );
145 }
146
147 public function add( $key, $value, $exptime = 0 ) {
148 return $this->doWrite( self::ALL, $this->asyncWrites, 'add', $key, $value, $exptime );
149 }
150
151 public function incr( $key, $value = 1 ) {
152 return $this->doWrite( self::ALL, $this->asyncWrites, 'incr', $key, $value );
153 }
154
155 public function decr( $key, $value = 1 ) {
156 return $this->doWrite( self::ALL, $this->asyncWrites, 'decr', $key, $value );
157 }
158
159 public function lock( $key, $timeout = 6, $expiry = 6, $rclass = '' ) {
160 // Only need to lock the first cache; also avoids deadlocks
161 return $this->caches[0]->lock( $key, $timeout, $expiry, $rclass );
162 }
163
164 public function unlock( $key ) {
165 // Only the first cache is locked
166 return $this->caches[0]->unlock( $key );
167 }
168
169 public function getLastError() {
170 return $this->caches[0]->getLastError();
171 }
172
173 public function clearLastError() {
174 $this->caches[0]->clearLastError();
175 }
176
177 /**
178 * Apply a write method to the first $count backing caches
179 *
180 * @param integer $count
181 * @param bool $asyncWrites
182 * @param string $method
183 * @param mixed ...
184 * @return bool
185 */
186 protected function doWrite( $count, $asyncWrites, $method /*, ... */ ) {
187 $ret = true;
188 $args = array_slice( func_get_args(), 3 );
189
190 foreach ( $this->caches as $i => $cache ) {
191 if ( $i >= $count ) {
192 break; // ignore the lower tiers
193 }
194
195 if ( $i == 0 || !$asyncWrites ) {
196 // First store or in sync mode: write now and get result
197 if ( !call_user_func_array( array( $cache, $method ), $args ) ) {
198 $ret = false;
199 }
200 } else {
201 // Secondary write in async mode: do not block this HTTP request
202 $logger = $this->logger;
203 call_user_func(
204 $this->asyncHandler,
205 function () use ( $cache, $method, $args, $logger ) {
206 if ( !call_user_func_array( array( $cache, $method ), $args ) ) {
207 $logger->warning( "Async $method op failed" );
208 }
209 }
210 );
211 }
212 }
213
214 return $ret;
215 }
216
217 /**
218 * Delete objects expiring before a certain date.
219 *
220 * Succeed if any of the child caches succeed.
221 * @param string $date
222 * @param bool|callable $progressCallback
223 * @return bool
224 */
225 public function deleteObjectsExpiringBefore( $date, $progressCallback = false ) {
226 $ret = false;
227 foreach ( $this->caches as $cache ) {
228 if ( $cache->deleteObjectsExpiringBefore( $date, $progressCallback ) ) {
229 $ret = true;
230 }
231 }
232
233 return $ret;
234 }
235 }