Merge "Convert EnqueueJob to using DB domains"
[lhc/web/wiklou.git] / tests / phpunit / includes / jobqueue / JobQueueTest.php
1 <?php
2
3 use MediaWiki\MediaWikiServices;
4
5 /**
6 * @group JobQueue
7 * @group medium
8 * @group Database
9 */
10 class JobQueueTest extends MediaWikiTestCase {
11 protected $key;
12 protected $queueRand, $queueRandTTL, $queueFifo, $queueFifoTTL;
13
14 function __construct( $name = null, array $data = [], $dataName = '' ) {
15 parent::__construct( $name, $data, $dataName );
16
17 $this->tablesUsed[] = 'job';
18 }
19
20 protected function setUp() {
21 global $wgJobTypeConf;
22 parent::setUp();
23
24 if ( $this->getCliArg( 'use-jobqueue' ) ) {
25 $name = $this->getCliArg( 'use-jobqueue' );
26 if ( !isset( $wgJobTypeConf[$name] ) ) {
27 throw new MWException( "No \$wgJobTypeConf entry for '$name'." );
28 }
29 $baseConfig = $wgJobTypeConf[$name];
30 } else {
31 $baseConfig = [ 'class' => JobQueueDBSingle::class ];
32 }
33 $baseConfig['type'] = 'null';
34 $baseConfig['domain'] = WikiMap::getCurrentWikiDbDomain()->getId();
35 $variants = [
36 'queueRand' => [ 'order' => 'random', 'claimTTL' => 0 ],
37 'queueRandTTL' => [ 'order' => 'random', 'claimTTL' => 10 ],
38 'queueTimestamp' => [ 'order' => 'timestamp', 'claimTTL' => 0 ],
39 'queueTimestampTTL' => [ 'order' => 'timestamp', 'claimTTL' => 10 ],
40 'queueFifo' => [ 'order' => 'fifo', 'claimTTL' => 0 ],
41 'queueFifoTTL' => [ 'order' => 'fifo', 'claimTTL' => 10 ],
42 ];
43 foreach ( $variants as $q => $settings ) {
44 try {
45 $this->$q = JobQueue::factory( $settings + $baseConfig );
46 } catch ( MWException $e ) {
47 // unsupported?
48 // @todo What if it was another error?
49 };
50 }
51 }
52
53 protected function tearDown() {
54 parent::tearDown();
55 foreach (
56 [
57 'queueRand', 'queueRandTTL', 'queueTimestamp', 'queueTimestampTTL',
58 'queueFifo', 'queueFifoTTL'
59 ] as $q
60 ) {
61 if ( $this->$q ) {
62 $this->$q->delete();
63 }
64 $this->$q = null;
65 }
66 }
67
68 /**
69 * @dataProvider provider_queueLists
70 * @covers JobQueue::getWiki
71 */
72 public function testGetWiki( $queue, $recycles, $desc ) {
73 $queue = $this->$queue;
74 if ( !$queue ) {
75 $this->markTestSkipped( $desc );
76 }
77 $this->assertEquals( wfWikiID(), $queue->getWiki(), "Proper wiki ID ($desc)" );
78 $this->assertEquals(
79 WikiMap::getCurrentWikiDbDomain()->getId(),
80 $queue->getDomain(),
81 "Proper wiki ID ($desc)" );
82 }
83
84 /**
85 * @dataProvider provider_queueLists
86 * @covers JobQueue::getType
87 */
88 public function testGetType( $queue, $recycles, $desc ) {
89 $queue = $this->$queue;
90 if ( !$queue ) {
91 $this->markTestSkipped( $desc );
92 }
93 $this->assertEquals( 'null', $queue->getType(), "Proper job type ($desc)" );
94 }
95
96 /**
97 * @dataProvider provider_queueLists
98 * @covers JobQueue
99 */
100 public function testBasicOperations( $queue, $recycles, $desc ) {
101 $queue = $this->$queue;
102 if ( !$queue ) {
103 $this->markTestSkipped( $desc );
104 }
105
106 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
107
108 $queue->flushCaches();
109 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
110 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
111
112 $this->assertNull( $queue->push( $this->newJob() ), "Push worked ($desc)" );
113 $this->assertNull( $queue->batchPush( [ $this->newJob() ] ), "Push worked ($desc)" );
114
115 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
116
117 $queue->flushCaches();
118 $this->assertEquals( 2, $queue->getSize(), "Queue size is correct ($desc)" );
119 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
120 $jobs = iterator_to_array( $queue->getAllQueuedJobs() );
121 $this->assertEquals( 2, count( $jobs ), "Queue iterator size is correct ($desc)" );
122
123 $job1 = $queue->pop();
124 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
125
126 $queue->flushCaches();
127 $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
128
129 $queue->flushCaches();
130 if ( $recycles ) {
131 $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
132 }
133
134 $job2 = $queue->pop();
135 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
136 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
137
138 $queue->flushCaches();
139 if ( $recycles ) {
140 $this->assertEquals( 2, $queue->getAcquiredCount(), "Active job count ($desc)" );
141 }
142
143 $queue->ack( $job1 );
144
145 $queue->flushCaches();
146 if ( $recycles ) {
147 $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
148 }
149
150 $queue->ack( $job2 );
151
152 $queue->flushCaches();
153 $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
154
155 $this->assertNull( $queue->batchPush( [ $this->newJob(), $this->newJob() ] ),
156 "Push worked ($desc)" );
157 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
158
159 $queue->delete();
160 $queue->flushCaches();
161 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
162 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
163 }
164
165 /**
166 * @dataProvider provider_queueLists
167 * @covers JobQueue
168 */
169 public function testBasicDeduplication( $queue, $recycles, $desc ) {
170 $queue = $this->$queue;
171 if ( !$queue ) {
172 $this->markTestSkipped( $desc );
173 }
174
175 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
176
177 $queue->flushCaches();
178 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
179 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
180
181 $this->assertNull(
182 $queue->batchPush(
183 [ $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ]
184 ),
185 "Push worked ($desc)" );
186
187 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
188
189 $queue->flushCaches();
190 $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
191 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
192
193 $this->assertNull(
194 $queue->batchPush(
195 [ $this->newDedupedJob(), $this->newDedupedJob(), $this->newDedupedJob() ]
196 ),
197 "Push worked ($desc)"
198 );
199
200 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
201
202 $queue->flushCaches();
203 $this->assertEquals( 1, $queue->getSize(), "Queue size is correct ($desc)" );
204 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
205
206 $job1 = $queue->pop();
207 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
208
209 $queue->flushCaches();
210 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
211 if ( $recycles ) {
212 $this->assertEquals( 1, $queue->getAcquiredCount(), "Active job count ($desc)" );
213 }
214
215 $queue->ack( $job1 );
216
217 $queue->flushCaches();
218 $this->assertEquals( 0, $queue->getAcquiredCount(), "Active job count ($desc)" );
219 }
220
221 /**
222 * @dataProvider provider_queueLists
223 * @covers JobQueue
224 */
225 public function testDeduplicationWhileClaimed( $queue, $recycles, $desc ) {
226 $queue = $this->$queue;
227 if ( !$queue ) {
228 $this->markTestSkipped( $desc );
229 }
230
231 $job = $this->newDedupedJob();
232 $queue->push( $job );
233
234 // De-duplication does not apply to already-claimed jobs
235 $j = $queue->pop();
236 $queue->push( $job );
237 $queue->ack( $j );
238
239 $j = $queue->pop();
240 // Make sure ack() of the twin did not delete the sibling data
241 $this->assertType( NullJob::class, $j );
242 }
243
244 /**
245 * @dataProvider provider_queueLists
246 * @covers JobQueue
247 */
248 public function testRootDeduplication( $queue, $recycles, $desc ) {
249 $queue = $this->$queue;
250 if ( !$queue ) {
251 $this->markTestSkipped( $desc );
252 }
253
254 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
255
256 $queue->flushCaches();
257 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
258 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
259
260 $id = wfRandomString( 32 );
261 $root1 = Job::newRootJobParams( "nulljobspam:$id" ); // task ID/timestamp
262 for ( $i = 0; $i < 5; ++$i ) {
263 $this->assertNull( $queue->push( $this->newJob( 0, $root1 ) ), "Push worked ($desc)" );
264 }
265 $queue->deduplicateRootJob( $this->newJob( 0, $root1 ) );
266
267 $root2 = $root1;
268 # Add a second to UNIX epoch and format back to TS_MW
269 $root2_ts = strtotime( $root2['rootJobTimestamp'] );
270 $root2_ts++;
271 $root2['rootJobTimestamp'] = wfTimestamp( TS_MW, $root2_ts );
272
273 $this->assertNotEquals( $root1['rootJobTimestamp'], $root2['rootJobTimestamp'],
274 "Root job signatures have different timestamps." );
275 for ( $i = 0; $i < 5; ++$i ) {
276 $this->assertNull( $queue->push( $this->newJob( 0, $root2 ) ), "Push worked ($desc)" );
277 }
278 $queue->deduplicateRootJob( $this->newJob( 0, $root2 ) );
279
280 $this->assertFalse( $queue->isEmpty(), "Queue is not empty ($desc)" );
281
282 $queue->flushCaches();
283 $this->assertEquals( 10, $queue->getSize(), "Queue size is correct ($desc)" );
284 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
285
286 $dupcount = 0;
287 $jobs = [];
288 do {
289 $job = $queue->pop();
290 if ( $job ) {
291 $jobs[] = $job;
292 $queue->ack( $job );
293 }
294 if ( $job instanceof DuplicateJob ) {
295 ++$dupcount;
296 }
297 } while ( $job );
298
299 $this->assertEquals( 10, count( $jobs ), "Correct number of jobs popped ($desc)" );
300 $this->assertEquals( 5, $dupcount, "Correct number of duplicate jobs popped ($desc)" );
301 }
302
303 /**
304 * @dataProvider provider_fifoQueueLists
305 * @covers JobQueue
306 */
307 public function testJobOrder( $queue, $recycles, $desc ) {
308 $queue = $this->$queue;
309 if ( !$queue ) {
310 $this->markTestSkipped( $desc );
311 }
312
313 $this->assertTrue( $queue->isEmpty(), "Queue is empty ($desc)" );
314
315 $queue->flushCaches();
316 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
317 $this->assertEquals( 0, $queue->getAcquiredCount(), "Queue is empty ($desc)" );
318
319 for ( $i = 0; $i < 10; ++$i ) {
320 $this->assertNull( $queue->push( $this->newJob( $i ) ), "Push worked ($desc)" );
321 }
322
323 for ( $i = 0; $i < 10; ++$i ) {
324 $job = $queue->pop();
325 $this->assertTrue( $job instanceof Job, "Jobs popped from queue ($desc)" );
326 $params = $job->getParams();
327 $this->assertEquals( $i, $params['i'], "Job popped from queue is FIFO ($desc)" );
328 $queue->ack( $job );
329 }
330
331 $this->assertFalse( $queue->pop(), "Queue is not empty ($desc)" );
332
333 $queue->flushCaches();
334 $this->assertEquals( 0, $queue->getSize(), "Queue is empty ($desc)" );
335 $this->assertEquals( 0, $queue->getAcquiredCount(), "No jobs active ($desc)" );
336 }
337
338 /**
339 * @covers JobQueue
340 */
341 public function testQueueAggregateTable() {
342 $queue = $this->queueFifo;
343 if ( !$queue || !method_exists( $queue, 'getServerQueuesWithJobs' ) ) {
344 $this->markTestSkipped();
345 }
346
347 $this->assertNotContains(
348 [ $queue->getType(), $queue->getWiki() ],
349 $queue->getServerQueuesWithJobs(),
350 "Null queue not in listing"
351 );
352
353 $queue->push( $this->newJob( 0 ) );
354
355 $this->assertContains(
356 [ $queue->getType(), $queue->getWiki() ],
357 $queue->getServerQueuesWithJobs(),
358 "Null queue in listing"
359 );
360 }
361
362 public static function provider_queueLists() {
363 return [
364 [ 'queueRand', false, 'Random queue without ack()' ],
365 [ 'queueRandTTL', true, 'Random queue with ack()' ],
366 [ 'queueTimestamp', false, 'Time ordered queue without ack()' ],
367 [ 'queueTimestampTTL', true, 'Time ordered queue with ack()' ],
368 [ 'queueFifo', false, 'FIFO ordered queue without ack()' ],
369 [ 'queueFifoTTL', true, 'FIFO ordered queue with ack()' ]
370 ];
371 }
372
373 public static function provider_fifoQueueLists() {
374 return [
375 [ 'queueFifo', false, 'Ordered queue without ack()' ],
376 [ 'queueFifoTTL', true, 'Ordered queue with ack()' ]
377 ];
378 }
379
380 function newJob( $i = 0, $rootJob = [] ) {
381 return new NullJob( Title::newMainPage(),
382 [ 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 0, 'i' => $i ] + $rootJob );
383 }
384
385 function newDedupedJob( $i = 0, $rootJob = [] ) {
386 return new NullJob( Title::newMainPage(),
387 [ 'lives' => 0, 'usleep' => 0, 'removeDuplicates' => 1, 'i' => $i ] + $rootJob );
388 }
389 }
390
391 class JobQueueDBSingle extends JobQueueDB {
392 protected function getDB( $index ) {
393 $lb = MediaWikiServices::getInstance()->getDBLoadBalancer();
394 // Override to not use CONN_TRX_AUTOCOMMIT so that we see the same temporary `job` table
395 return $lb->getConnection( $index, [], $this->domain );
396 }
397 }