Database: Add batching to non-native insertSelect()
authorBrad Jorsch <bjorsch@wikimedia.org>
Thu, 8 Feb 2018 19:16:29 +0000 (14:16 -0500)
committerBrad Jorsch <bjorsch@wikimedia.org>
Wed, 28 Feb 2018 18:58:37 +0000 (13:58 -0500)
It would be easy for a call to nonNativeInsertSelect() to generate an
INSERT that's too big for the database to actually process. Add batching
to try to avoid that.

Bug: T160993
Change-Id: I1de994208d95926f0d75c0d7cab7b5fe1dd565c3

includes/libs/rdbms/database/Database.php
tests/phpunit/includes/libs/rdbms/database/DatabaseSQLTest.php

index 572a798..fd7bfe9 100644 (file)
@@ -236,6 +236,9 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
        /** @var TransactionProfiler */
        protected $trxProfiler;
 
+       /** @var int */
+       protected $nonNativeInsertSelectBatchSize = 10000;
+
        /**
         * Constructor and database handle and attempt to connect to the DB server
         *
@@ -278,6 +281,10 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                $this->queryLogger = $params['queryLogger'];
                $this->errorLogger = $params['errorLogger'];
 
+               if ( isset( $params['nonNativeInsertSelectBatchSize'] ) ) {
+                       $this->nonNativeInsertSelectBatchSize = $params['nonNativeInsertSelectBatchSize'];
+               }
+
                // Set initial dummy domain until open() sets the final DB/prefix
                $this->currentDomain = DatabaseDomain::newUnspecified();
 
@@ -331,6 +338,7 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
         *   - cliMode: Whether to consider the execution context that of a CLI script.
         *   - agent: Optional name used to identify the end-user in query profiling/logging.
         *   - srvCache: Optional BagOStuff instance to an APC-style cache.
+        *   - nonNativeInsertSelectBatchSize: Optional batch size for non-native INSERT SELECT emulation.
         * @return Database|null If the database driver or extension cannot be found
         * @throws InvalidArgumentException If the database driver or extension cannot be found
         * @since 1.18
@@ -2504,12 +2512,43 @@ abstract class Database implements IDatabase, IMaintainableDatabase, LoggerAware
                        return false;
                }
 
-               $rows = [];
-               foreach ( $res as $row ) {
-                       $rows[] = (array)$row;
+               try {
+                       $affectedRowCount = 0;
+                       $this->startAtomic( $fname );
+                       $rows = [];
+                       $ok = true;
+                       foreach ( $res as $row ) {
+                               $rows[] = (array)$row;
+
+                               // Avoid inserts that are too huge
+                               if ( count( $rows ) >= $this->nonNativeInsertSelectBatchSize ) {
+                                       $ok = $this->insert( $destTable, $rows, $fname, $insertOptions );
+                                       if ( !$ok ) {
+                                               break;
+                                       }
+                                       $affectedRowCount += $this->affectedRows();
+                                       $rows = [];
+                               }
+                       }
+                       if ( $rows && $ok ) {
+                               $ok = $this->insert( $destTable, $rows, $fname, $insertOptions );
+                               if ( $ok ) {
+                                       $affectedRowCount += $this->affectedRows();
+                               }
+                       }
+                       if ( $ok ) {
+                               $this->endAtomic( $fname );
+                               $this->affectedRowCount = $affectedRowCount;
+                       } else {
+                               $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                               $this->affectedRowCount = 0;
+                       }
+                       return $ok;
+               } catch ( Exception $e ) {
+                       $this->rollback( $fname, self::FLUSHING_INTERNAL );
+                       $this->affectedRowCount = 0;
+                       throw $e;
                }
-
-               return $this->insert( $destTable, $rows, $fname, $insertOptions );
        }
 
        /**
index ebf6e45..8886544 100644 (file)
@@ -457,7 +457,7 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                        isset( $sql['selectOptions'] ) ? $sql['selectOptions'] : [],
                        isset( $sql['selectJoinConds'] ) ? $sql['selectJoinConds'] : []
                );
-               $this->assertLastSqlDb( implode( '; ', [ $sqlSelect, $sqlInsert ] ), $dbWeb );
+               $this->assertLastSqlDb( implode( '; ', [ $sqlSelect, 'BEGIN', $sqlInsert, 'COMMIT' ] ), $dbWeb );
        }
 
        public static function provideInsertSelect() {
@@ -537,6 +537,30 @@ class DatabaseSQLTest extends PHPUnit\Framework\TestCase {
                ];
        }
 
+       public function testInsertSelectBatching() {
+               $dbWeb = new DatabaseTestHelper( __CLASS__, [ 'cliMode' => false ] );
+               $rows = [];
+               for ( $i = 0; $i <= 25000; $i++ ) {
+                       $rows[] = [ 'field' => $i ];
+               }
+               $dbWeb->forceNextResult( $rows );
+               $dbWeb->insertSelect(
+                       'insert_table',
+                       'select_table',
+                       [ 'field' => 'field2' ],
+                       '*',
+                       __METHOD__
+               );
+               $this->assertLastSqlDb( implode( '; ', [
+                       'SELECT field2 AS field FROM select_table WHERE *   FOR UPDATE',
+                       'BEGIN',
+                       "INSERT INTO insert_table (field) VALUES ('" . implode( "'),('", range( 0, 9999 ) ) . "')",
+                       "INSERT INTO insert_table (field) VALUES ('" . implode( "'),('", range( 10000, 19999 ) ) . "')",
+                       "INSERT INTO insert_table (field) VALUES ('" . implode( "'),('", range( 20000, 25000 ) ) . "')",
+                       'COMMIT'
+               ] ), $dbWeb );
+       }
+
        /**
         * @dataProvider provideReplace
         * @covers Wikimedia\Rdbms\Database::replace