rdbms: Remove support for PostgreSQL < 9.2, and improve INSERT IGNORE for 9.5
[lhc/web/wiklou.git] / includes / libs / rdbms / database / DatabasePostgres.php
index 9ffcc8b..9c24787 100644 (file)
@@ -36,8 +36,6 @@ class DatabasePostgres extends Database {
 
        /** @var resource */
        protected $lastResultHandle = null;
-       /** @var int The number of rows affected as an integer */
-       protected $lastAffectedRowCount = null;
 
        /** @var float|string */
        private $numericVersion = null;
@@ -155,9 +153,7 @@ class DatabasePostgres extends Database {
                $this->query( "SET datestyle = 'ISO, YMD'", __METHOD__ );
                $this->query( "SET timezone = 'GMT'", __METHOD__ );
                $this->query( "SET standard_conforming_strings = on", __METHOD__ );
-               if ( $this->getServerVersion() >= 9.0 ) {
-                       $this->query( "SET bytea_output = 'escape'", __METHOD__ ); // PHP bug 53127
-               }
+               $this->query( "SET bytea_output = 'escape'", __METHOD__ ); // PHP bug 53127
 
                $this->determineCoreSchema( $this->schema );
                // The schema to be used is now in the search path; no need for explicit qualification
@@ -219,7 +215,6 @@ class DatabasePostgres extends Database {
                        throw new DBUnexpectedError( $this, "Unable to post new query to PostgreSQL\n" );
                }
                $this->lastResultHandle = pg_get_result( $conn );
-               $this->lastAffectedRowCount = null;
                if ( pg_result_error( $this->lastResultHandle ) ) {
                        return false;
                }
@@ -371,10 +366,6 @@ class DatabasePostgres extends Database {
        }
 
        protected function fetchAffectedRowCount() {
-               if ( !is_null( $this->lastAffectedRowCount ) ) {
-                       // Forced result for simulated queries
-                       return $this->lastAffectedRowCount;
-               }
                if ( !$this->lastResultHandle ) {
                        return 0;
                }
@@ -559,18 +550,7 @@ __INDEXATTR__;
                return parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds );
        }
 
-       /**
-        * INSERT wrapper, inserts an array into a table
-        *
-        * $args may be a single associative array, or an array of these with numeric keys,
-        * for multi-row insert (Postgres version 8.2 and above only).
-        *
-        * @param string $table Name of the table to insert to.
-        * @param array $args Items to insert into the table.
-        * @param string $fname Name of the function, for profiling
-        * @param array|string $options String or array. Valid options: IGNORE
-        * @return bool Success of insert operation. IGNORE always returns true.
-        */
+       /** @inheritDoc */
        public function insert( $table, $args, $fname = __METHOD__, $options = [] ) {
                if ( !count( $args ) ) {
                        return true;
@@ -586,98 +566,68 @@ __INDEXATTR__;
                }
 
                if ( isset( $args[0] ) && is_array( $args[0] ) ) {
-                       $multi = true;
+                       $rows = $args;
                        $keys = array_keys( $args[0] );
                } else {
-                       $multi = false;
+                       $rows = [ $args ];
                        $keys = array_keys( $args );
                }
 
-               // If IGNORE is set, we use savepoints to emulate mysql's behavior
-               // @todo If PostgreSQL 9.5+, we could use ON CONFLICT DO NOTHING instead
-               $savepoint = $olde = null;
-               $numrowsinserted = 0;
-               if ( in_array( 'IGNORE', $options ) ) {
-                       $savepoint = new SavepointPostgres( $this, 'mw', $this->queryLogger );
-                       $olde = error_reporting( 0 );
-                       // For future use, we may want to track the number of actual inserts
-                       // Right now, insert (all writes) simply return true/false
-               }
+               $ignore = in_array( 'IGNORE', $options );
 
                $sql = "INSERT INTO $table (" . implode( ',', $keys ) . ') VALUES ';
 
-               if ( $multi ) {
-                       if ( $this->numericVersion >= 8.2 && !$savepoint ) {
-                               $first = true;
-                               foreach ( $args as $row ) {
-                                       if ( $first ) {
-                                               $first = false;
-                                       } else {
-                                               $sql .= ',';
-                                       }
-                                       $sql .= '(' . $this->makeList( $row ) . ')';
+               if ( $this->numericVersion >= 9.5 || !$ignore ) {
+                       // No IGNORE or our PG has "ON CONFLICT DO NOTHING"
+                       $first = true;
+                       foreach ( $rows as $row ) {
+                               if ( $first ) {
+                                       $first = false;
+                               } else {
+                                       $sql .= ',';
                                }
-                               $res = (bool)$this->query( $sql, $fname, $savepoint );
-                       } else {
-                               $res = true;
-                               $origsql = $sql;
-                               foreach ( $args as $row ) {
-                                       $tempsql = $origsql;
+                               $sql .= '(' . $this->makeList( $row ) . ')';
+                       }
+                       if ( $ignore ) {
+                               $sql .= ' ON CONFLICT DO NOTHING';
+                       }
+                       $this->query( $sql, $fname );
+               } else {
+                       // Emulate IGNORE by doing each row individually, with savepoints
+                       // to roll back as necessary.
+                       $numrowsinserted = 0;
+
+                       $tok = $this->startAtomic( "$fname (outer)", self::ATOMIC_CANCELABLE );
+                       try {
+                               foreach ( $rows as $row ) {
+                                       $tempsql = $sql;
                                        $tempsql .= '(' . $this->makeList( $row ) . ')';
 
-                                       if ( $savepoint ) {
-                                               $savepoint->savepoint();
-                                       }
-
-                                       $tempres = (bool)$this->query( $tempsql, $fname, $savepoint );
-
-                                       if ( $savepoint ) {
-                                               $bar = pg_result_error( $this->lastResultHandle );
-                                               if ( $bar != false ) {
-                                                       $savepoint->rollback();
-                                               } else {
-                                                       $savepoint->release();
-                                                       $numrowsinserted++;
+                                       $this->startAtomic( "$fname (inner)", self::ATOMIC_CANCELABLE );
+                                       try {
+                                               $this->query( $tempsql, $fname );
+                                               $this->endAtomic( "$fname (inner)" );
+                                               $numrowsinserted++;
+                                       } catch ( DBQueryError $e ) {
+                                               $this->cancelAtomic( "$fname (inner)" );
+                                               // Our IGNORE is supposed to ignore duplicate key errors, but not others.
+                                               // (even though MySQL's version apparently ignores all errors)
+                                               if ( $e->errno !== '23505' ) {
+                                                       throw $e;
                                                }
                                        }
-
-                                       // If any of them fail, we fail overall for this function call
-                                       // Note that this will be ignored if IGNORE is set
-                                       if ( !$tempres ) {
-                                               $res = false;
-                                       }
-                               }
-                       }
-               } else {
-                       // Not multi, just a lone insert
-                       if ( $savepoint ) {
-                               $savepoint->savepoint();
-                       }
-
-                       $sql .= '(' . $this->makeList( $args ) . ')';
-                       $res = (bool)$this->query( $sql, $fname, $savepoint );
-                       if ( $savepoint ) {
-                               $bar = pg_result_error( $this->lastResultHandle );
-                               if ( $bar != false ) {
-                                       $savepoint->rollback();
-                               } else {
-                                       $savepoint->release();
-                                       $numrowsinserted++;
                                }
+                       } catch ( Exception $e ) {
+                               $this->cancelAtomic( "$fname (outer)", $tok );
+                               throw $e;
                        }
-               }
-               if ( $savepoint ) {
-                       error_reporting( $olde );
-                       $savepoint->commit();
+                       $this->endAtomic( "$fname (outer)" );
 
                        // Set the affected row count for the whole operation
-                       $this->lastAffectedRowCount = $numrowsinserted;
-
-                       // IGNORE always returns true
-                       return true;
+                       $this->affectedRowCount = $numrowsinserted;
                }
 
-               return $res;
+               return true;
        }
 
        /**
@@ -707,14 +657,31 @@ __INDEXATTR__;
                        $insertOptions = [ $insertOptions ];
                }
 
-               /*
-                * If IGNORE is set, use the non-native version.
-                * @todo If PostgreSQL 9.5+, we could use ON CONFLICT DO NOTHING
-                */
                if ( in_array( 'IGNORE', $insertOptions ) ) {
-                       return $this->nonNativeInsertSelect(
-                               $destTable, $srcTable, $varMap, $conds, $fname, $insertOptions, $selectOptions, $selectJoinConds
-                       );
+                       if ( $this->getServerVersion() >= 9.5 ) {
+                               // Use ON CONFLICT DO NOTHING if we have it for IGNORE
+                               $destTable = $this->tableName( $destTable );
+
+                               $selectSql = $this->selectSQLText(
+                                       $srcTable,
+                                       array_values( $varMap ),
+                                       $conds,
+                                       $fname,
+                                       $selectOptions,
+                                       $selectJoinConds
+                               );
+
+                               $sql = "INSERT INTO $destTable (" . implode( ',', array_keys( $varMap ) ) . ') ' .
+                                       $selectSql . ' ON CONFLICT DO NOTHING';
+
+                               return $this->query( $sql, $fname );
+                       } else {
+                               // IGNORE and we don't have ON CONFLICT DO NOTHING, so just use the non-native version
+                               return $this->nonNativeInsertSelect(
+                                       $destTable, $srcTable, $varMap, $conds, $fname,
+                                       $insertOptions, $selectOptions, $selectJoinConds
+                               );
+                       }
                }
 
                return parent::nativeInsertSelect( $destTable, $srcTable, $varMap, $conds, $fname,
@@ -786,17 +753,17 @@ __INDEXATTR__;
        }
 
        public function wasDeadlock() {
-               // https://www.postgresql.org/docs/8.2/static/errcodes-appendix.html
+               // https://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
                return $this->lastErrno() === '40P01';
        }
 
        public function wasLockTimeout() {
-               // https://www.postgresql.org/docs/8.2/static/errcodes-appendix.html
+               // https://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
                return $this->lastErrno() === '55P03';
        }
 
        public function wasConnectionError( $errno ) {
-               // https://www.postgresql.org/docs/8.2/static/errcodes-appendix.html
+               // https://www.postgresql.org/docs/9.2/static/errcodes-appendix.html
                static $codes = [ '08000', '08003', '08006', '08001', '08004', '57P01', '57P03', '53300' ];
 
                return in_array( $errno, $codes, true );
@@ -1213,28 +1180,6 @@ SQL;
                return "'" . pg_escape_string( $conn, (string)$s ) . "'";
        }
 
-       /**
-        * Postgres specific version of replaceVars.
-        * Calls the parent version in Database.php
-        *
-        * @param string $ins SQL string, read from a stream (usually tables.sql)
-        * @return string SQL string
-        */
-       protected function replaceVars( $ins ) {
-               $ins = parent::replaceVars( $ins );
-
-               if ( $this->numericVersion >= 8.3 ) {
-                       // Thanks for not providing backwards-compatibility, 8.3
-                       $ins = preg_replace( "/to_tsvector\s*\(\s*'default'\s*,/", 'to_tsvector(', $ins );
-               }
-
-               if ( $this->numericVersion <= 8.1 ) { // Our minimum version
-                       $ins = str_replace( 'USING gin', 'USING gist', $ins );
-               }
-
-               return $ins;
-       }
-
        public function makeSelectOptions( $options ) {
                $preLimitTail = $postLimitTail = '';
                $startOpts = $useIndex = $ignoreIndex = '';
@@ -1332,7 +1277,7 @@ SQL;
                if ( !parent::lockIsFree( $lockName, $method ) ) {
                        return false; // already held
                }
-               // http://www.postgresql.org/docs/8.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
+               // http://www.postgresql.org/docs/9.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
                $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
                $result = $this->query( "SELECT (CASE(pg_try_advisory_lock($key))
                        WHEN 'f' THEN 'f' ELSE pg_advisory_unlock($key) END) AS lockstatus", $method );
@@ -1342,7 +1287,7 @@ SQL;
        }
 
        public function lock( $lockName, $method, $timeout = 5 ) {
-               // http://www.postgresql.org/docs/8.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
+               // http://www.postgresql.org/docs/9.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
                $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
                $loop = new WaitConditionLoop(
                        function () use ( $lockName, $key, $timeout, $method ) {
@@ -1362,7 +1307,7 @@ SQL;
        }
 
        public function unlock( $lockName, $method ) {
-               // http://www.postgresql.org/docs/8.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
+               // http://www.postgresql.org/docs/9.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
                $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
                $result = $this->query( "SELECT pg_advisory_unlock($key) as lockstatus", $method );
                $row = $this->fetchObject( $result );