Introduce BlobStore::getBlobBatch method
authorPetr Pchelko <ppchelko@wikimedia.org>
Mon, 26 Aug 2019 20:18:50 +0000 (13:18 -0700)
committerMobrovac <mobrovac@wikimedia.org>
Fri, 30 Aug 2019 09:40:49 +0000 (09:40 +0000)
Bug: T230834
Change-Id: I56306c50a6617dc91e4eb362ef010703ac25d951

includes/Storage/BlobStore.php
includes/Storage/SqlBlobStore.php
tests/phpunit/includes/Storage/SqlBlobStoreTest.php

index 8b1112b..78885db 100644 (file)
@@ -22,6 +22,8 @@
 
 namespace MediaWiki\Storage;
 
+use StatusValue;
+
 /**
  * Service for loading and storing data blobs.
  *
@@ -95,6 +97,19 @@ interface BlobStore {
         */
        public function getBlob( $blobAddress, $queryFlags = 0 );
 
+       /**
+        * A batched version of BlobStore::getBlob.
+        *
+        * @param string[] $blobAddresses An array of blob addresses.
+        * @param int $queryFlags See IDBAccessObject.
+        * @throws BlobAccessException
+        * @return StatusValue A status with a map of blobAddress => binary blob data or null
+        *         if fetching the blob has failed. Fetch failures errors are the
+        *         warnings in the status object.
+        * @since 1.34
+        */
+       public function getBlobBatch( $blobAddresses, $queryFlags = 0 );
+
        /**
         * Stores an arbitrary blob of data and returns an address that can be used with
         * getBlob() to retrieve the same blob of data,
index d1b688b..8c011df 100644 (file)
 
 namespace MediaWiki\Storage;
 
+use AppendIterator;
 use DBAccessObjectUtils;
 use IDBAccessObject;
 use IExpiringStore;
 use InvalidArgumentException;
 use Language;
 use MWException;
+use StatusValue;
 use WANObjectCache;
 use ExternalStoreAccess;
 use Wikimedia\Assert\Assert;
@@ -277,92 +279,170 @@ class SqlBlobStore implements IDBAccessObject, BlobStore {
        public function getBlob( $blobAddress, $queryFlags = 0 ) {
                Assert::parameterType( 'string', $blobAddress, '$blobAddress' );
 
-               // No negative caching; negative hits on text rows may be due to corrupted replica DBs
+               $error = null;
                $blob = $this->cache->getWithSetCallback(
                        $this->getCacheKey( $blobAddress ),
                        $this->getCacheTTL(),
-                       function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags ) {
+                       function ( $unused, &$ttl, &$setOpts ) use ( $blobAddress, $queryFlags, &$error ) {
                                // Ignore $setOpts; blobs are immutable and negatives are not cached
-                               return $this->fetchBlob( $blobAddress, $queryFlags );
+                               list( $result, $errors ) = $this->fetchBlobs( [ $blobAddress ], $queryFlags );
+                               // No negative caching; negative hits on text rows may be due to corrupted replica DBs
+                               $error = $errors[$blobAddress] ?? null;
+                               return $result[$blobAddress];
                        },
                        [ 'pcGroup' => self::TEXT_CACHE_GROUP, 'pcTTL' => IExpiringStore::TTL_PROC_LONG ]
                );
 
-               if ( $blob === false ) {
-                       throw new BlobAccessException( 'Failed to load blob from address ' . $blobAddress );
+               if ( $error ) {
+                       throw new BlobAccessException( $error );
                }
 
+               Assert::postcondition( is_string( $blob ), 'Blob must not be null' );
                return $blob;
        }
 
+       /**
+        * A batched version of BlobStore::getBlob.
+        *
+        * @param string[] $blobAddresses An array of blob addresses.
+        * @param int $queryFlags See IDBAccessObject.
+        * @throws BlobAccessException
+        * @return StatusValue A status with a map of blobAddress => binary blob data or null
+        *         if fetching the blob has failed. Fetch failures errors are the
+        *         warnings in the status object.
+        * @since 1.34
+        */
+       public function getBlobBatch( $blobAddresses, $queryFlags = 0 ) {
+               $errors = null;
+               $addressByCacheKey = $this->cache->makeMultiKeys(
+                       $blobAddresses,
+                       function ( $blobAddress ) {
+                               return $this->getCacheKey( $blobAddress );
+                       }
+               );
+               $blobsByCacheKey = $this->cache->getMultiWithUnionSetCallback(
+                       $addressByCacheKey,
+                       $this->getCacheTTL(),
+                       function ( array $blobAddresses, array &$ttls, array &$setOpts ) use ( $queryFlags, &$errors ) {
+                               // Ignore $setOpts; blobs are immutable and negatives are not cached
+                               list( $result, $errors ) = $this->fetchBlobs( $blobAddresses, $queryFlags );
+                               return $result;
+                       },
+                       [ 'pcGroup' => self::TEXT_CACHE_GROUP, 'pcTTL' => IExpiringStore::TTL_PROC_LONG ]
+               );
+
+               // Remap back to incoming blob addresses. The return value of the
+               // WANObjectCache::getMultiWithUnionSetCallback is keyed on the internal
+               // keys from WANObjectCache::makeMultiKeys, so we need to remap them
+               // before returning to the client.
+               $blobsByAddress = [];
+               foreach ( $blobsByCacheKey as $cacheKey => $blob ) {
+                       $blobsByAddress[ $addressByCacheKey[ $cacheKey ] ] = $blob !== false ? $blob : null;
+               }
+
+               $result = StatusValue::newGood( $blobsByAddress );
+               if ( $errors ) {
+                       foreach ( $errors as $error ) {
+                               $result->warning( 'internalerror', $error );
+                       }
+               }
+               return $result;
+       }
+
        /**
         * MCR migration note: this corresponds to Revision::fetchText
         *
-        * @param string $blobAddress
+        * @param string[] $blobAddresses
         * @param int $queryFlags
         *
         * @throws BlobAccessException
-        * @return string|false
-        */
-       private function fetchBlob( $blobAddress, $queryFlags ) {
-               list( $schema, $id, ) = self::splitBlobAddress( $blobAddress );
+        * @return array [ $result, $errors ] A map of blob addresses to successfully fetched blobs
+        *         or false if fetch failed, plus and array of errors
+        */
+       private function fetchBlobs( $blobAddresses, $queryFlags ) {
+               $textIdToBlobAddress = [];
+               $result = [];
+               $errors = [];
+               foreach ( $blobAddresses as $blobAddress ) {
+                       list( $schema, $id ) = self::splitBlobAddress( $blobAddress );
+                       //TODO: MCR: also support 'ex' schema with ExternalStore URLs, plus flags encoded in the URL!
+                       if ( $schema === 'tt' ) {
+                               $textId = intval( $id );
+                               $textIdToBlobAddress[$textId] = $blobAddress;
+                       } else {
+                               $errors[$blobAddress] = "Unknown blob address schema: $schema";
+                               $result[$blobAddress] = false;
+                               continue;
+                       }
 
-               //TODO: MCR: also support 'ex' schema with ExternalStore URLs, plus flags encoded in the URL!
-               if ( $schema === 'tt' ) {
-                       $textId = intval( $id );
-               } else {
-                       // XXX: change to better exceptions! That makes migration more difficult, though.
-                       throw new BlobAccessException( "Unknown blob address schema: $schema" );
+                       if ( !$textId || $id !== (string)$textId ) {
+                               $errors[$blobAddress] = "Bad blob address: $blobAddress";
+                               $result[$blobAddress] = false;
+                       }
                }
 
-               if ( !$textId || $id !== (string)$textId ) {
-                       // XXX: change to better exceptions! That makes migration more difficult, though.
-                       throw new BlobAccessException( "Bad blob address: $blobAddress" );
+               $textIds = array_keys( $textIdToBlobAddress );
+               if ( !$textIds ) {
+                       return [ $result, $errors ];
                }
-
                // Callers doing updates will pass in READ_LATEST as usual. Since the text/blob tables
                // do not normally get rows changed around, set READ_LATEST_IMMUTABLE in those cases.
                $queryFlags |= DBAccessObjectUtils::hasFlags( $queryFlags, self::READ_LATEST )
                        ? self::READ_LATEST_IMMUTABLE
                        : 0;
-
                list( $index, $options, $fallbackIndex, $fallbackOptions ) =
                        DBAccessObjectUtils::getDBOptions( $queryFlags );
-
                // Text data is immutable; check replica DBs first.
-               $row = $this->getDBConnection( $index )->selectRow(
+               $dbConnection = $this->getDBConnection( $index );
+               $rows = $dbConnection->select(
                        'text',
-                       [ 'old_text', 'old_flags' ],
-                       [ 'old_id' => $textId ],
+                       [ 'old_id', 'old_text', 'old_flags' ],
+                       [ 'old_id' => $textIds ],
                        __METHOD__,
                        $options
                );
 
-               // Fallback to DB_MASTER in some cases if the row was not found, using the appropriate
+               // Fallback to DB_MASTER in some cases if not all the rows were found, using the appropriate
                // options, such as FOR UPDATE to avoid missing rows due to REPEATABLE-READ.
-               if ( !$row && $fallbackIndex !== null ) {
-                       $row = $this->getDBConnection( $fallbackIndex )->selectRow(
+               if ( $dbConnection->numRows( $rows ) !== count( $textIds ) && $fallbackIndex !== null ) {
+                       $fetchedTextIds = [];
+                       foreach ( $rows as $row ) {
+                               $fetchedTextIds[] = $row->old_id;
+                       }
+                       $missingTextIds = array_diff( $textIds, $fetchedTextIds );
+                       $dbConnection = $this->getDBConnection( $fallbackIndex );
+                       $rowsFromFallback = $dbConnection->select(
                                'text',
-                               [ 'old_text', 'old_flags' ],
-                               [ 'old_id' => $textId ],
+                               [ 'old_id', 'old_text', 'old_flags' ],
+                               [ 'old_id' => $missingTextIds ],
                                __METHOD__,
                                $fallbackOptions
                        );
+                       $appendIterator = new AppendIterator();
+                       $appendIterator->append( $rows );
+                       $appendIterator->append( $rowsFromFallback );
+                       $rows = $appendIterator;
                }
 
-               if ( !$row ) {
-                       wfWarn( __METHOD__ . ": No text row with ID $textId." );
-                       return false;
+               foreach ( $rows as $row ) {
+                       $blobAddress = $textIdToBlobAddress[$row->old_id];
+                       $blob = $this->expandBlob( $row->old_text, $row->old_flags, $blobAddress );
+                       if ( $blob === false ) {
+                               $errors[$blobAddress] = "Bad data in text row {$row->old_id}.";
+                       }
+                       $result[$blobAddress] = $blob;
                }
 
-               $blob = $this->expandBlob( $row->old_text, $row->old_flags, $blobAddress );
-
-               if ( $blob === false ) {
-                       wfLogWarning( __METHOD__ . ": Bad data in text row $textId." );
-                       return false;
+               // If we're still missing some of the rows, set errors for missing blobs.
+               if ( count( $result ) !== count( $blobAddresses ) ) {
+                       foreach ( $blobAddresses as $blobAddress ) {
+                               if ( !isset( $result[$blobAddress ] ) ) {
+                                       $errors[$blobAddress] = "Unable to fetch blob at $blobAddress";
+                                       $result[$blobAddress] = false;
+                               }
+                       }
                }
-
-               return $blob;
+               return [ $result, $errors ];
        }
 
        /**
index ac39b48..1d88122 100644 (file)
@@ -5,6 +5,7 @@ namespace MediaWiki\Tests\Storage;
 use InvalidArgumentException;
 use Language;
 use MediaWiki\MediaWikiServices;
+use MediaWiki\Storage\BlobAccessException;
 use MediaWiki\Storage\SqlBlobStore;
 use MediaWikiTestCase;
 use stdClass;
@@ -218,6 +219,7 @@ class SqlBlobStoreTest extends MediaWikiTestCase {
        }
 
        /**
+        * @param string $blob
         * @dataProvider provideBlobs
         * @covers \MediaWiki\Storage\SqlBlobStore::storeBlob
         * @covers \MediaWiki\Storage\SqlBlobStore::getBlob
@@ -228,6 +230,109 @@ class SqlBlobStoreTest extends MediaWikiTestCase {
                $this->assertSame( $blob, $store->getBlob( $address ) );
        }
 
+       /**
+        * @covers \MediaWiki\Storage\SqlBlobStore::storeBlob
+        * @covers \MediaWiki\Storage\SqlBlobStore::getBlobBatch
+        */
+       public function testSimpleStorageGetBlobBatchSimpleEmpty() {
+               $store = $this->getBlobStore();
+               $this->assertArrayEquals(
+                       [],
+                       $store->getBlobBatch( [] )->getValue()
+               );
+       }
+
+       /**
+        * @param string $blob
+        * @dataProvider provideBlobs
+        * @covers \MediaWiki\Storage\SqlBlobStore::storeBlob
+        * @covers \MediaWiki\Storage\SqlBlobStore::getBlobBatch
+        */
+       public function testSimpleStorageGetBlobBatchSimpleRoundtrip( $blob ) {
+               $store = $this->getBlobStore();
+               $addresses = [
+                       $store->storeBlob( $blob ),
+                       $store->storeBlob( $blob . '1' )
+               ];
+               $this->assertArrayEquals(
+                       array_combine( $addresses, [ $blob, $blob . '1' ] ),
+                       $store->getBlobBatch( $addresses )->getValue()
+               );
+       }
+
+       /**
+        * @covers \MediaWiki\Storage\SqlBlobStore::getBlob
+        */
+       public function testSimpleStorageNonExistentBlob() {
+               $this->setExpectedException( BlobAccessException::class );
+               $store = $this->getBlobStore();
+               $store->getBlob( 'tt:this_will_not_exist' );
+       }
+
+       /**
+        * @covers \MediaWiki\Storage\SqlBlobStore::getBlobBatch
+        */
+       public function testSimpleStorageNonExistentBlobBatch() {
+               $store = $this->getBlobStore();
+               $result = $store->getBlobBatch( [ 'tt:this_will_not_exist', 'tt:1000', 'bla:1001' ] );
+               $this->assertSame(
+                       [
+                               'tt:this_will_not_exist' => null,
+                               'tt:1000' => null,
+                               'bla:1001' => null
+                       ],
+                       $result->getValue()
+               );
+               $this->assertSame( [
+                       [
+                               'type' => 'warning',
+                               'message' => 'internalerror',
+                               'params' => [
+                                       'Bad blob address: tt:this_will_not_exist'
+                               ]
+                       ],
+                       [
+                               'type' => 'warning',
+                               'message' => 'internalerror',
+                               'params' => [
+                                       'Unknown blob address schema: bla'
+                               ]
+                       ],
+                       [
+                               'type' => 'warning',
+                               'message' => 'internalerror',
+                               'params' => [
+                                       'Unable to fetch blob at tt:1000'
+                               ]
+                       ]
+               ], $result->getErrors() );
+       }
+
+       /**
+        * @covers \MediaWiki\Storage\SqlBlobStore::getBlobBatch
+        */
+       public function testSimpleStoragePartialNonExistentBlobBatch() {
+               $store = $this->getBlobStore();
+               $address = $store->storeBlob( 'test_data' );
+               $result = $store->getBlobBatch( [ $address, 'tt:this_will_not_exist_too' ] );
+               $this->assertSame(
+                       [
+                               $address => 'test_data',
+                               'tt:this_will_not_exist_too' => null
+                       ],
+                       $result->getValue()
+               );
+               $this->assertSame( [
+                       [
+                               'type' => 'warning',
+                               'message' => 'internalerror',
+                               'params' => [
+                                       'Bad blob address: tt:this_will_not_exist_too'
+                               ]
+                       ],
+               ], $result->getErrors() );
+       }
+
        /**
         * @dataProvider provideBlobs
         * @covers \MediaWiki\Storage\SqlBlobStore::storeBlob