objectcache: add BagOStuff::deleteMulti() method for consistency
authorAaron Schulz <aschulz@wikimedia.org>
Tue, 12 Mar 2019 07:38:56 +0000 (00:38 -0700)
committerAaron Schulz <aschulz@wikimedia.org>
Thu, 14 Mar 2019 23:12:30 +0000 (23:12 +0000)
Also:
* Make the BagOStuff tests actually pass for memcached and sql
* Add more unit tests for BagOStuff
* Make SqlBagOStuff::add() more atomic

Change-Id: Ic1eec0990a66b595b57c646498c3bd229442230c

includes/libs/objectcache/BagOStuff.php
includes/libs/objectcache/RedisBagOStuff.php
includes/libs/objectcache/ReplicatedBagOStuff.php
includes/objectcache/SqlBagOStuff.php
tests/phpunit/includes/libs/objectcache/BagOStuffTest.php

index 2f10708..9c75a2f 100644 (file)
@@ -463,7 +463,7 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
                        function () use ( $key, $expiry, $fname ) {
                                $this->clearLastError();
                                if ( $this->add( "{$key}:lock", 1, $expiry ) ) {
-                                       return true; // locked!
+                                       return WaitConditionLoop::CONDITION_REACHED; // locked!
                                } elseif ( $this->getLastError() ) {
                                        $this->logger->warning(
                                                $fname . ' failed due to I/O error for {key}.',
@@ -606,6 +606,22 @@ abstract class BagOStuff implements IExpiringStore, LoggerAwareInterface {
                return $res;
        }
 
+       /**
+        * Batch deletion
+        * @param string[] $keys List of keys
+        * @param int $flags Bitfield of BagOStuff::WRITE_* constants
+        * @return bool Success
+        * @since 1.33
+        */
+       public function deleteMulti( array $keys, $flags = 0 ) {
+               $res = true;
+               foreach ( $keys as $key ) {
+                       $res = $this->delete( $key, $flags ) && $res;
+               }
+
+               return $res;
+       }
+
        /**
         * Insertion
         * @param string $key
index 2bf0c48..f64fe7e 100644 (file)
@@ -233,6 +233,45 @@ class RedisBagOStuff extends BagOStuff {
                return $result;
        }
 
+       public function deleteMulti( array $keys, $flags = 0 ) {
+               $batches = [];
+               $conns = [];
+               foreach ( $keys as $key ) {
+                       list( $server, $conn ) = $this->getConnection( $key );
+                       if ( !$conn ) {
+                               continue;
+                       }
+                       $conns[$server] = $conn;
+                       $batches[$server][] = $key;
+               }
+
+               $result = true;
+               foreach ( $batches as $server => $batchKeys ) {
+                       $conn = $conns[$server];
+                       try {
+                               $conn->multi( Redis::PIPELINE );
+                               foreach ( $batchKeys as $key ) {
+                                       $conn->delete( $key );
+                               }
+                               $batchResult = $conn->exec();
+                               if ( $batchResult === false ) {
+                                       $this->debug( "deleteMulti request to $server failed" );
+                                       continue;
+                               }
+                               foreach ( $batchResult as $value ) {
+                                       if ( $value === false ) {
+                                               $result = false;
+                                       }
+                               }
+                       } catch ( RedisException $e ) {
+                               $this->handleException( $conn, $e );
+                               $result = false;
+                       }
+               }
+
+               return $result;
+       }
+
        public function add( $key, $value, $expiry = 0, $flags = 0 ) {
                list( $server, $conn ) = $this->getConnection( $key );
                if ( !$conn ) {
index 6afdc1c..ea380a6 100644 (file)
@@ -90,10 +90,18 @@ class ReplicatedBagOStuff extends BagOStuff {
                return $this->writeStore->set( $key, $value, $exptime, $flags );
        }
 
+       public function setMulti( array $keys, $exptime = 0, $flags = 0 ) {
+               return $this->writeStore->setMulti( $keys, $exptime, $flags );
+       }
+
        public function delete( $key, $flags = 0 ) {
                return $this->writeStore->delete( $key, $flags );
        }
 
+       public function deleteMulti( array $keys, $flags = 0 ) {
+               return $this->writeStore->deleteMulti( $keys, $flags );
+       }
+
        public function add( $key, $value, $exptime = 0, $flags = 0 ) {
                return $this->writeStore->add( $key, $value, $exptime );
        }
index 66b488e..b2d61a8 100644 (file)
@@ -312,6 +312,10 @@ class SqlBagOStuff extends BagOStuff {
        }
 
        public function setMulti( array $data, $expiry = 0, $flags = 0 ) {
+               return $this->insertMulti( $data, $expiry, $flags, true );
+       }
+
+       private function insertMulti( array $data, $expiry, $flags, $replace ) {
                $keysByTable = [];
                foreach ( $data as $key => $value ) {
                        list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
@@ -354,19 +358,22 @@ class SqlBagOStuff extends BagOStuff {
                                }
 
                                try {
-                                       $db->replace(
-                                               $tableName,
-                                               [ 'keyname' ],
-                                               $rows,
-                                               __METHOD__
-                                       );
+                                       if ( $replace ) {
+                                               $db->replace( $tableName, [ 'keyname' ], $rows, __METHOD__ );
+                                       } else {
+                                               $db->insert( $tableName, $rows, __METHOD__, [ 'IGNORE' ] );
+                                               $result = ( $db->affectedRows() > 0 && $result );
+                                       }
                                } catch ( DBError $e ) {
                                        $this->handleWriteError( $e, $db, $serverIndex );
                                        $result = false;
                                }
 
                        }
+               }
 
+               if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
+                       $result = $this->waitForReplication() && $result;
                }
 
                return $result;
@@ -374,13 +381,16 @@ class SqlBagOStuff extends BagOStuff {
 
        public function set( $key, $value, $exptime = 0, $flags = 0 ) {
                $ok = $this->setMulti( [ $key => $value ], $exptime );
-               if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
-                       $ok = $this->waitForReplication() && $ok;
-               }
 
                return $ok;
        }
 
+       public function add( $key, $value, $exptime = 0, $flags = 0 ) {
+               $added = $this->insertMulti( [ $key => $value ], $exptime, $flags, false );
+
+               return $added;
+       }
+
        protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
                list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
                $db = null;
@@ -423,26 +433,46 @@ class SqlBagOStuff extends BagOStuff {
                return (bool)$db->affectedRows();
        }
 
-       public function delete( $key, $flags = 0 ) {
-               $ok = true;
+       public function deleteMulti( array $keys, $flags = 0 ) {
+               $keysByTable = [];
+               foreach ( $keys as $key ) {
+                       list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
+                       $keysByTable[$serverIndex][$tableName][] = $key;
+               }
 
-               list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
-               $db = null;
+               $result = true;
                $silenceScope = $this->silenceTransactionProfiler();
-               try {
-                       $db = $this->getDB( $serverIndex );
-                       $db->delete(
-                               $tableName,
-                               [ 'keyname' => $key ],
-                               __METHOD__ );
-               } catch ( DBError $e ) {
-                       $this->handleWriteError( $e, $db, $serverIndex );
-                       $ok = false;
+               foreach ( $keysByTable as $serverIndex => $serverKeys ) {
+                       $db = null;
+                       try {
+                               $db = $this->getDB( $serverIndex );
+                       } catch ( DBError $e ) {
+                               $this->handleWriteError( $e, $db, $serverIndex );
+                               $result = false;
+                               continue;
+                       }
+
+                       foreach ( $serverKeys as $tableName => $tableKeys ) {
+                               try {
+                                       $db->delete( $tableName, [ 'keyname' => $tableKeys ], __METHOD__ );
+                               } catch ( DBError $e ) {
+                                       $this->handleWriteError( $e, $db, $serverIndex );
+                                       $result = false;
+                               }
+
+                       }
                }
+
                if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) {
-                       $ok = $this->waitForReplication() && $ok;
+                       $result = $this->waitForReplication() && $result;
                }
 
+               return $result;
+       }
+
+       public function delete( $key, $flags = 0 ) {
+               $ok = $this->deleteMulti( [ $key ], $flags );
+
                return $ok;
        }
 
@@ -458,31 +488,34 @@ class SqlBagOStuff extends BagOStuff {
                                [ 'value', 'exptime' ],
                                [ 'keyname' => $key ],
                                __METHOD__,
-                               [ 'FOR UPDATE' ] );
+                               [ 'FOR UPDATE' ]
+                       );
                        if ( $row === false ) {
                                // Missing
-
-                               return null;
+                               return false;
                        }
                        $db->delete( $tableName, [ 'keyname' => $key ], __METHOD__ );
                        if ( $this->isExpired( $db, $row->exptime ) ) {
                                // Expired, do not reinsert
-
-                               return null;
+                               return false;
                        }
 
                        $oldValue = intval( $this->unserialize( $db->decodeBlob( $row->value ) ) );
                        $newValue = $oldValue + $step;
-                       $db->insert( $tableName,
+                       $db->insert(
+                               $tableName,
                                [
                                        'keyname' => $key,
                                        'value' => $db->encodeBlob( $this->serialize( $newValue ) ),
                                        'exptime' => $row->exptime
-                               ], __METHOD__, 'IGNORE' );
+                               ],
+                               __METHOD__,
+                               'IGNORE'
+                       );
 
                        if ( $db->affectedRows() == 0 ) {
                                // Race condition. See T30611
-                               $newValue = null;
+                               $newValue = false;
                        }
                } catch ( DBError $e ) {
                        $this->handleWriteError( $e, $db, $serverIndex );
index f0f55fb..b68ffaf 100644 (file)
@@ -26,6 +26,7 @@ class BagOStuffTest extends MediaWikiTestCase {
                }
 
                $this->cache->delete( $this->cache->makeKey( self::TEST_KEY ) );
+               $this->cache->delete( $this->cache->makeKey( self::TEST_KEY ) . ':lock' );
        }
 
        /**
@@ -68,10 +69,25 @@ class BagOStuffTest extends MediaWikiTestCase {
         * @covers BagOStuff::mergeViaCas
         */
        public function testMerge() {
-               $calls = 0;
                $key = $this->cache->makeKey( self::TEST_KEY );
-               $callback = function ( BagOStuff $cache, $key, $oldVal ) use ( &$calls ) {
+               $locks = false;
+               $checkLockingCallback = function ( BagOStuff $cache, $key, $oldVal ) use ( &$locks ) {
+                       $locks = $cache->get( "$key:lock" );
+
+                       return false;
+               };
+
+               $this->cache->merge( $key, $checkLockingCallback, 5 );
+               $this->assertFalse( $this->cache->get( $key ) );
+
+               $calls = 0;
+               $casRace = false; // emulate a race
+               $callback = function ( BagOStuff $cache, $key, $oldVal ) use ( &$calls, &$casRace ) {
                        ++$calls;
+                       if ( $casRace ) {
+                               // Uses CAS instead?
+                               $cache->set( $key, 'conflict', 5 );
+                       }
 
                        return ( $oldVal === false ) ? 'merged' : $oldVal . 'merged';
                };
@@ -87,21 +103,43 @@ class BagOStuffTest extends MediaWikiTestCase {
                $this->assertEquals( 'mergedmerged', $this->cache->get( $key ) );
 
                $calls = 0;
-               $this->cache->lock( $key );
-               $this->assertFalse( $this->cache->merge( $key, $callback, 1 ), 'Non-blocking merge' );
-               $this->cache->unlock( $key );
-               $this->assertEquals( 0, $calls );
+               if ( $locks ) {
+                       // merge were something else already was merging (e.g. had the lock)
+                       $this->cache->lock( $key );
+                       $this->assertFalse(
+                               $this->cache->merge( $key, $callback, 5, 1 ),
+                               'Non-blocking merge (locking)'
+                       );
+                       $this->cache->unlock( $key );
+                       $this->assertEquals( 0, $calls );
+               } else {
+                       $casRace = true;
+                       $this->assertFalse(
+                               $this->cache->merge( $key, $callback, 5, 1 ),
+                               'Non-blocking merge (CAS)'
+                       );
+                       $this->assertEquals( 1, $calls );
+               }
        }
 
        /**
         * @covers BagOStuff::merge
         * @covers BagOStuff::mergeViaLock
+        * @dataProvider provideTestMerge_fork
         */
-       public function testMerge_fork() {
+       public function testMerge_fork( $exists, $winsLocking, $resLocking, $resCAS ) {
                $key = $this->cache->makeKey( self::TEST_KEY );
-               $callback = function ( BagOStuff $cache, $key, $oldVal ) {
-                       return ( $oldVal === false ) ? 'merged' : $oldVal . 'merged';
+               $pCallback = function ( BagOStuff $cache, $key, $oldVal ) {
+                       return ( $oldVal === false ) ? 'init-parent' : $oldVal . '-merged-parent';
+               };
+               $cCallback = function ( BagOStuff $cache, $key, $oldVal ) {
+                       return ( $oldVal === false ) ? 'init-child' : $oldVal . '-merged-child';
                };
+
+               if ( $exists ) {
+                       $this->cache->set( $key, 'x', 5 );
+               }
+
                /*
                 * Test concurrent merges by forking this process, if:
                 * - not manually called with --use-bagostuff
@@ -115,17 +153,21 @@ class BagOStuffTest extends MediaWikiTestCase {
                $fork &= !$this->cache instanceof MultiWriteBagOStuff;
                if ( $fork ) {
                        $pid = null;
+                       $locked = false;
                        // Function to start merge(), run another merge() midway through, then finish
-                       $outerFunc = function ( BagOStuff $cache, $key, $oldVal ) use ( $callback, &$pid ) {
+                       $func = function ( BagOStuff $cache, $key, $cur )
+                               use ( $pCallback, $cCallback, &$pid, &$locked )
+                       {
                                $pid = pcntl_fork();
                                if ( $pid == -1 ) {
                                        return false;
                                } elseif ( $pid ) {
+                                       $locked = $cache->get( "$key:lock" ); // parent has lock?
                                        pcntl_wait( $status );
 
-                                       return $callback( $cache, $key, $oldVal );
+                                       return $pCallback( $cache, $key, $cur );
                                } else {
-                                       $this->cache->merge( $key, $callback, 0, 1 );
+                                       $this->cache->merge( $key, $cCallback, 0, 1 );
                                        // Bail out of the outer merge() in the child process since it does not
                                        // need to attempt to write anything. Success is checked by the parent.
                                        parent::tearDown(); // avoid phpunit notices
@@ -134,22 +176,34 @@ class BagOStuffTest extends MediaWikiTestCase {
                        };
 
                        // attempt a merge - this should fail
-                       $merged = $this->cache->merge( $key, $outerFunc, 0, 1 );
+                       $merged = $this->cache->merge( $key, $func, 0, 1 );
 
                        if ( $pid == -1 ) {
                                return; // can't fork, ignore this test...
                        }
 
-                       // merge has failed because child process was merging (and we only attempted once)
-                       $this->assertFalse( $merged );
-
-                       // make sure the child's merge is completed and verify
-                       $this->assertEquals( $this->cache->get( $key ), 'mergedmerged' );
+                       if ( $locked ) {
+                               // merge succeed since child was locked out
+                               $this->assertEquals( $winsLocking, $merged );
+                               $this->assertEquals( $this->cache->get( $key ), $resLocking );
+                       } else {
+                               // merge has failed because child process was merging (and we only attempted once)
+                               $this->assertEquals( !$winsLocking, $merged );
+                               $this->assertEquals( $this->cache->get( $key ), $resCAS );
+                       }
                } else {
                        $this->markTestSkipped( 'No pcntl methods available' );
                }
        }
 
+       function provideTestMerge_fork() {
+               return [
+                       // (already exists, parent wins if locking, result if locking, result if CAS)
+                       [ false, true, 'init-parent', 'init-child' ],
+                       [ true, true, 'x-merged-parent', 'x-merged-child' ]
+               ];
+       }
+
        /**
         * @covers BagOStuff::changeTTL
         */
@@ -266,6 +320,34 @@ class BagOStuffTest extends MediaWikiTestCase {
                $this->cache->delete( $key4 );
        }
 
+       /**
+        * @covers BagOStuff::setMulti
+        * @covers BagOStuff::deleteMulti
+        */
+       public function testSetDeleteMulti() {
+               $map = [
+                       $this->cache->makeKey( 'test-1' ) => 'Siberian',
+                       $this->cache->makeKey( 'test-2' ) => [ 'Huskies' ],
+                       $this->cache->makeKey( 'test-3' ) => [ 'are' => 'the' ],
+                       $this->cache->makeKey( 'test-4' ) => (object)[ 'greatest' => 'animal' ],
+                       $this->cache->makeKey( 'test-5' ) => 4,
+                       $this->cache->makeKey( 'test-6' ) => 'ever'
+               ];
+
+               $this->cache->setMulti( $map, 5 );
+               $this->assertEquals(
+                       $map,
+                       $this->cache->getMulti( array_keys( $map ) )
+               );
+
+               $this->assertTrue( $this->cache->deleteMulti( array_keys( $map ), 5 ) );
+
+               $this->assertEquals(
+                       [],
+                       $this->cache->getMulti( array_keys( $map ) )
+               );
+       }
+
        /**
         * @covers BagOStuff::getScopedLock
         */