Merge "Add CollationFa"
[lhc/web/wiklou.git] / includes / libs / rdbms / database / DatabasePostgres.php
1 <?php
2 /**
3 * This is the Postgres database abstraction layer.
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 * http://www.gnu.org/copyleft/gpl.html
19 *
20 * @file
21 * @ingroup Database
22 */
23 use Wikimedia\WaitConditionLoop;
24
25 /**
26 * @ingroup Database
27 */
28 class DatabasePostgres extends Database {
29 /** @var int|bool */
30 protected $port;
31
32 /** @var resource */
33 protected $mLastResult = null;
34 /** @var int The number of rows affected as an integer */
35 protected $mAffectedRows = null;
36
37 /** @var int */
38 private $mInsertId = null;
39 /** @var float|string */
40 private $numericVersion = null;
41 /** @var string Connect string to open a PostgreSQL connection */
42 private $connectString;
43 /** @var string */
44 private $mCoreSchema;
45 /** @var string[] Map of (reserved table name => alternate table name) */
46 private $keywordTableMap = [];
47
48 /**
49 * @see Database::__construct()
50 * @param array $params Additional parameters include:
51 * - keywordTableMap : Map of reserved table names to alternative table names to use
52 */
53 public function __construct( array $params ) {
54 $this->port = isset( $params['port'] ) ? $params['port'] : false;
55 $this->keywordTableMap = isset( $params['keywordTableMap'] )
56 ? $params['keywordTableMap']
57 : [];
58
59 parent::__construct( $params );
60 }
61
62 public function getType() {
63 return 'postgres';
64 }
65
66 public function implicitGroupby() {
67 return false;
68 }
69
70 public function implicitOrderby() {
71 return false;
72 }
73
74 public function hasConstraint( $name ) {
75 $conn = $this->getBindingHandle();
76
77 $sql = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n " .
78 "WHERE c.connamespace = n.oid AND conname = '" .
79 pg_escape_string( $conn, $name ) . "' AND n.nspname = '" .
80 pg_escape_string( $conn, $this->getCoreSchema() ) . "'";
81 $res = $this->doQuery( $sql );
82
83 return $this->numRows( $res );
84 }
85
86 public function open( $server, $user, $password, $dbName ) {
87 # Test for Postgres support, to avoid suppressed fatal error
88 if ( !function_exists( 'pg_connect' ) ) {
89 throw new DBConnectionError(
90 $this,
91 "Postgres functions missing, have you compiled PHP with the --with-pgsql\n" .
92 "option? (Note: if you recently installed PHP, you may need to restart your\n" .
93 "webserver and database)\n"
94 );
95 }
96
97 $this->mServer = $server;
98 $this->mUser = $user;
99 $this->mPassword = $password;
100 $this->mDBname = $dbName;
101
102 $connectVars = [
103 'dbname' => $dbName,
104 'user' => $user,
105 'password' => $password
106 ];
107 if ( $server != false && $server != '' ) {
108 $connectVars['host'] = $server;
109 }
110 if ( (int)$this->port > 0 ) {
111 $connectVars['port'] = (int)$this->port;
112 }
113 if ( $this->mFlags & self::DBO_SSL ) {
114 $connectVars['sslmode'] = 1;
115 }
116
117 $this->connectString = $this->makeConnectionString( $connectVars );
118 $this->close();
119 $this->installErrorHandler();
120
121 try {
122 // Use new connections to let LoadBalancer/LBFactory handle reuse
123 $this->mConn = pg_connect( $this->connectString, PGSQL_CONNECT_FORCE_NEW );
124 } catch ( Exception $ex ) {
125 $this->restoreErrorHandler();
126 throw $ex;
127 }
128
129 $phpError = $this->restoreErrorHandler();
130
131 if ( !$this->mConn ) {
132 $this->queryLogger->debug(
133 "DB connection error\n" .
134 "Server: $server, Database: $dbName, User: $user, Password: " .
135 substr( $password, 0, 3 ) . "...\n"
136 );
137 $this->queryLogger->debug( $this->lastError() . "\n" );
138 throw new DBConnectionError( $this, str_replace( "\n", ' ', $phpError ) );
139 }
140
141 $this->mOpened = true;
142
143 # If called from the command-line (e.g. importDump), only show errors
144 if ( $this->cliMode ) {
145 $this->doQuery( "SET client_min_messages = 'ERROR'" );
146 }
147
148 $this->query( "SET client_encoding='UTF8'", __METHOD__ );
149 $this->query( "SET datestyle = 'ISO, YMD'", __METHOD__ );
150 $this->query( "SET timezone = 'GMT'", __METHOD__ );
151 $this->query( "SET standard_conforming_strings = on", __METHOD__ );
152 if ( $this->getServerVersion() >= 9.0 ) {
153 $this->query( "SET bytea_output = 'escape'", __METHOD__ ); // PHP bug 53127
154 }
155
156 $this->determineCoreSchema( $this->mSchema );
157 // The schema to be used is now in the search path; no need for explicit qualification
158 $this->mSchema = '';
159
160 return $this->mConn;
161 }
162
163 /**
164 * Postgres doesn't support selectDB in the same way MySQL does. So if the
165 * DB name doesn't match the open connection, open a new one
166 * @param string $db
167 * @return bool
168 */
169 public function selectDB( $db ) {
170 if ( $this->mDBname !== $db ) {
171 return (bool)$this->open( $this->mServer, $this->mUser, $this->mPassword, $db );
172 } else {
173 return true;
174 }
175 }
176
177 /**
178 * @param string[] $vars
179 * @return string
180 */
181 private function makeConnectionString( $vars ) {
182 $s = '';
183 foreach ( $vars as $name => $value ) {
184 $s .= "$name='" . str_replace( "'", "\\'", $value ) . "' ";
185 }
186
187 return $s;
188 }
189
190 protected function closeConnection() {
191 return $this->mConn ? pg_close( $this->mConn ) : true;
192 }
193
194 public function doQuery( $sql ) {
195 $conn = $this->getBindingHandle();
196
197 $sql = mb_convert_encoding( $sql, 'UTF-8' );
198 // Clear previously left over PQresult
199 while ( $res = pg_get_result( $conn ) ) {
200 pg_free_result( $res );
201 }
202 if ( pg_send_query( $conn, $sql ) === false ) {
203 throw new DBUnexpectedError( $this, "Unable to post new query to PostgreSQL\n" );
204 }
205 $this->mLastResult = pg_get_result( $conn );
206 $this->mAffectedRows = null;
207 if ( pg_result_error( $this->mLastResult ) ) {
208 return false;
209 }
210
211 return $this->mLastResult;
212 }
213
214 protected function dumpError() {
215 $diags = [
216 PGSQL_DIAG_SEVERITY,
217 PGSQL_DIAG_SQLSTATE,
218 PGSQL_DIAG_MESSAGE_PRIMARY,
219 PGSQL_DIAG_MESSAGE_DETAIL,
220 PGSQL_DIAG_MESSAGE_HINT,
221 PGSQL_DIAG_STATEMENT_POSITION,
222 PGSQL_DIAG_INTERNAL_POSITION,
223 PGSQL_DIAG_INTERNAL_QUERY,
224 PGSQL_DIAG_CONTEXT,
225 PGSQL_DIAG_SOURCE_FILE,
226 PGSQL_DIAG_SOURCE_LINE,
227 PGSQL_DIAG_SOURCE_FUNCTION
228 ];
229 foreach ( $diags as $d ) {
230 $this->queryLogger->debug( sprintf( "PgSQL ERROR(%d): %s\n",
231 $d, pg_result_error_field( $this->mLastResult, $d ) ) );
232 }
233 }
234
235 public function reportQueryError( $error, $errno, $sql, $fname, $tempIgnore = false ) {
236 if ( $tempIgnore ) {
237 /* Check for constraint violation */
238 if ( $errno === '23505' ) {
239 parent::reportQueryError( $error, $errno, $sql, $fname, $tempIgnore );
240
241 return;
242 }
243 }
244 /* Transaction stays in the ERROR state until rolled back */
245 if ( $this->mTrxLevel ) {
246 $ignore = $this->ignoreErrors( true );
247 $this->rollback( __METHOD__ );
248 $this->ignoreErrors( $ignore );
249 }
250 parent::reportQueryError( $error, $errno, $sql, $fname, false );
251 }
252
253 public function freeResult( $res ) {
254 if ( $res instanceof ResultWrapper ) {
255 $res = $res->result;
256 }
257 MediaWiki\suppressWarnings();
258 $ok = pg_free_result( $res );
259 MediaWiki\restoreWarnings();
260 if ( !$ok ) {
261 throw new DBUnexpectedError( $this, "Unable to free Postgres result\n" );
262 }
263 }
264
265 public function fetchObject( $res ) {
266 if ( $res instanceof ResultWrapper ) {
267 $res = $res->result;
268 }
269 MediaWiki\suppressWarnings();
270 $row = pg_fetch_object( $res );
271 MediaWiki\restoreWarnings();
272 # @todo FIXME: HACK HACK HACK HACK debug
273
274 # @todo hashar: not sure if the following test really trigger if the object
275 # fetching failed.
276 $conn = $this->getBindingHandle();
277 if ( pg_last_error( $conn ) ) {
278 throw new DBUnexpectedError(
279 $this,
280 'SQL error: ' . htmlspecialchars( pg_last_error( $conn ) )
281 );
282 }
283
284 return $row;
285 }
286
287 public function fetchRow( $res ) {
288 if ( $res instanceof ResultWrapper ) {
289 $res = $res->result;
290 }
291 MediaWiki\suppressWarnings();
292 $row = pg_fetch_array( $res );
293 MediaWiki\restoreWarnings();
294
295 $conn = $this->getBindingHandle();
296 if ( pg_last_error( $conn ) ) {
297 throw new DBUnexpectedError(
298 $this,
299 'SQL error: ' . htmlspecialchars( pg_last_error( $conn ) )
300 );
301 }
302
303 return $row;
304 }
305
306 public function numRows( $res ) {
307 if ( $res instanceof ResultWrapper ) {
308 $res = $res->result;
309 }
310 MediaWiki\suppressWarnings();
311 $n = pg_num_rows( $res );
312 MediaWiki\restoreWarnings();
313
314 $conn = $this->getBindingHandle();
315 if ( pg_last_error( $conn ) ) {
316 throw new DBUnexpectedError(
317 $this,
318 'SQL error: ' . htmlspecialchars( pg_last_error( $conn ) )
319 );
320 }
321
322 return $n;
323 }
324
325 public function numFields( $res ) {
326 if ( $res instanceof ResultWrapper ) {
327 $res = $res->result;
328 }
329
330 return pg_num_fields( $res );
331 }
332
333 public function fieldName( $res, $n ) {
334 if ( $res instanceof ResultWrapper ) {
335 $res = $res->result;
336 }
337
338 return pg_field_name( $res, $n );
339 }
340
341 /**
342 * Return the result of the last call to nextSequenceValue();
343 * This must be called after nextSequenceValue().
344 *
345 * @return int|null
346 */
347 public function insertId() {
348 return $this->mInsertId;
349 }
350
351 public function dataSeek( $res, $row ) {
352 if ( $res instanceof ResultWrapper ) {
353 $res = $res->result;
354 }
355
356 return pg_result_seek( $res, $row );
357 }
358
359 public function lastError() {
360 if ( $this->mConn ) {
361 if ( $this->mLastResult ) {
362 return pg_result_error( $this->mLastResult );
363 } else {
364 return pg_last_error();
365 }
366 }
367
368 return $this->getLastPHPError() ?: 'No database connection';
369 }
370
371 public function lastErrno() {
372 if ( $this->mLastResult ) {
373 return pg_result_error_field( $this->mLastResult, PGSQL_DIAG_SQLSTATE );
374 } else {
375 return false;
376 }
377 }
378
379 public function affectedRows() {
380 if ( !is_null( $this->mAffectedRows ) ) {
381 // Forced result for simulated queries
382 return $this->mAffectedRows;
383 }
384 if ( empty( $this->mLastResult ) ) {
385 return 0;
386 }
387
388 return pg_affected_rows( $this->mLastResult );
389 }
390
391 /**
392 * Estimate rows in dataset
393 * Returns estimated count, based on EXPLAIN output
394 * This is not necessarily an accurate estimate, so use sparingly
395 * Returns -1 if count cannot be found
396 * Takes same arguments as Database::select()
397 *
398 * @param string $table
399 * @param string $vars
400 * @param string $conds
401 * @param string $fname
402 * @param array $options
403 * @return int
404 */
405 public function estimateRowCount( $table, $vars = '*', $conds = '',
406 $fname = __METHOD__, $options = []
407 ) {
408 $options['EXPLAIN'] = true;
409 $res = $this->select( $table, $vars, $conds, $fname, $options );
410 $rows = -1;
411 if ( $res ) {
412 $row = $this->fetchRow( $res );
413 $count = [];
414 if ( preg_match( '/rows=(\d+)/', $row[0], $count ) ) {
415 $rows = (int)$count[1];
416 }
417 }
418
419 return $rows;
420 }
421
422 public function indexInfo( $table, $index, $fname = __METHOD__ ) {
423 $sql = "SELECT indexname FROM pg_indexes WHERE tablename='$table'";
424 $res = $this->query( $sql, $fname );
425 if ( !$res ) {
426 return null;
427 }
428 foreach ( $res as $row ) {
429 if ( $row->indexname == $this->indexName( $index ) ) {
430 return $row;
431 }
432 }
433
434 return false;
435 }
436
437 public function indexAttributes( $index, $schema = false ) {
438 if ( $schema === false ) {
439 $schema = $this->getCoreSchema();
440 }
441 /*
442 * A subquery would be not needed if we didn't care about the order
443 * of attributes, but we do
444 */
445 $sql = <<<__INDEXATTR__
446
447 SELECT opcname,
448 attname,
449 i.indoption[s.g] as option,
450 pg_am.amname
451 FROM
452 (SELECT generate_series(array_lower(isub.indkey,1), array_upper(isub.indkey,1)) AS g
453 FROM
454 pg_index isub
455 JOIN pg_class cis
456 ON cis.oid=isub.indexrelid
457 JOIN pg_namespace ns
458 ON cis.relnamespace = ns.oid
459 WHERE cis.relname='$index' AND ns.nspname='$schema') AS s,
460 pg_attribute,
461 pg_opclass opcls,
462 pg_am,
463 pg_class ci
464 JOIN pg_index i
465 ON ci.oid=i.indexrelid
466 JOIN pg_class ct
467 ON ct.oid = i.indrelid
468 JOIN pg_namespace n
469 ON ci.relnamespace = n.oid
470 WHERE
471 ci.relname='$index' AND n.nspname='$schema'
472 AND attrelid = ct.oid
473 AND i.indkey[s.g] = attnum
474 AND i.indclass[s.g] = opcls.oid
475 AND pg_am.oid = opcls.opcmethod
476 __INDEXATTR__;
477 $res = $this->query( $sql, __METHOD__ );
478 $a = [];
479 if ( $res ) {
480 foreach ( $res as $row ) {
481 $a[] = [
482 $row->attname,
483 $row->opcname,
484 $row->amname,
485 $row->option ];
486 }
487 } else {
488 return null;
489 }
490
491 return $a;
492 }
493
494 public function indexUnique( $table, $index, $fname = __METHOD__ ) {
495 $sql = "SELECT indexname FROM pg_indexes WHERE tablename='{$table}'" .
496 " AND indexdef LIKE 'CREATE UNIQUE%(" .
497 $this->strencode( $this->indexName( $index ) ) .
498 ")'";
499 $res = $this->query( $sql, $fname );
500 if ( !$res ) {
501 return null;
502 }
503
504 return $res->numRows() > 0;
505 }
506
507 public function selectSQLText(
508 $table, $vars, $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
509 ) {
510 // Change the FOR UPDATE option as necessary based on the join conditions. Then pass
511 // to the parent function to get the actual SQL text.
512 // In Postgres when using FOR UPDATE, only the main table and tables that are inner joined
513 // can be locked. That means tables in an outer join cannot be FOR UPDATE locked. Trying to
514 // do so causes a DB error. This wrapper checks which tables can be locked and adjusts it
515 // accordingly.
516 // MySQL uses "ORDER BY NULL" as an optimization hint, but that is illegal in PostgreSQL.
517 if ( is_array( $options ) ) {
518 $forUpdateKey = array_search( 'FOR UPDATE', $options, true );
519 if ( $forUpdateKey !== false && $join_conds ) {
520 unset( $options[$forUpdateKey] );
521
522 foreach ( $join_conds as $table_cond => $join_cond ) {
523 if ( 0 === preg_match( '/^(?:LEFT|RIGHT|FULL)(?: OUTER)? JOIN$/i', $join_cond[0] ) ) {
524 $options['FOR UPDATE'][] = $table_cond;
525 }
526 }
527 }
528
529 if ( isset( $options['ORDER BY'] ) && $options['ORDER BY'] == 'NULL' ) {
530 unset( $options['ORDER BY'] );
531 }
532 }
533
534 return parent::selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds );
535 }
536
537 /**
538 * INSERT wrapper, inserts an array into a table
539 *
540 * $args may be a single associative array, or an array of these with numeric keys,
541 * for multi-row insert (Postgres version 8.2 and above only).
542 *
543 * @param string $table Name of the table to insert to.
544 * @param array $args Items to insert into the table.
545 * @param string $fname Name of the function, for profiling
546 * @param array|string $options String or array. Valid options: IGNORE
547 * @return bool Success of insert operation. IGNORE always returns true.
548 */
549 public function insert( $table, $args, $fname = __METHOD__, $options = [] ) {
550 if ( !count( $args ) ) {
551 return true;
552 }
553
554 $table = $this->tableName( $table );
555 if ( !isset( $this->numericVersion ) ) {
556 $this->getServerVersion();
557 }
558
559 if ( !is_array( $options ) ) {
560 $options = [ $options ];
561 }
562
563 if ( isset( $args[0] ) && is_array( $args[0] ) ) {
564 $multi = true;
565 $keys = array_keys( $args[0] );
566 } else {
567 $multi = false;
568 $keys = array_keys( $args );
569 }
570
571 // If IGNORE is set, we use savepoints to emulate mysql's behavior
572 $savepoint = $olde = null;
573 $numrowsinserted = 0;
574 if ( in_array( 'IGNORE', $options ) ) {
575 $savepoint = new SavepointPostgres( $this, 'mw', $this->queryLogger );
576 $olde = error_reporting( 0 );
577 // For future use, we may want to track the number of actual inserts
578 // Right now, insert (all writes) simply return true/false
579 }
580
581 $sql = "INSERT INTO $table (" . implode( ',', $keys ) . ') VALUES ';
582
583 if ( $multi ) {
584 if ( $this->numericVersion >= 8.2 && !$savepoint ) {
585 $first = true;
586 foreach ( $args as $row ) {
587 if ( $first ) {
588 $first = false;
589 } else {
590 $sql .= ',';
591 }
592 $sql .= '(' . $this->makeList( $row ) . ')';
593 }
594 $res = (bool)$this->query( $sql, $fname, $savepoint );
595 } else {
596 $res = true;
597 $origsql = $sql;
598 foreach ( $args as $row ) {
599 $tempsql = $origsql;
600 $tempsql .= '(' . $this->makeList( $row ) . ')';
601
602 if ( $savepoint ) {
603 $savepoint->savepoint();
604 }
605
606 $tempres = (bool)$this->query( $tempsql, $fname, $savepoint );
607
608 if ( $savepoint ) {
609 $bar = pg_result_error( $this->mLastResult );
610 if ( $bar != false ) {
611 $savepoint->rollback();
612 } else {
613 $savepoint->release();
614 $numrowsinserted++;
615 }
616 }
617
618 // If any of them fail, we fail overall for this function call
619 // Note that this will be ignored if IGNORE is set
620 if ( !$tempres ) {
621 $res = false;
622 }
623 }
624 }
625 } else {
626 // Not multi, just a lone insert
627 if ( $savepoint ) {
628 $savepoint->savepoint();
629 }
630
631 $sql .= '(' . $this->makeList( $args ) . ')';
632 $res = (bool)$this->query( $sql, $fname, $savepoint );
633 if ( $savepoint ) {
634 $bar = pg_result_error( $this->mLastResult );
635 if ( $bar != false ) {
636 $savepoint->rollback();
637 } else {
638 $savepoint->release();
639 $numrowsinserted++;
640 }
641 }
642 }
643 if ( $savepoint ) {
644 error_reporting( $olde );
645 $savepoint->commit();
646
647 // Set the affected row count for the whole operation
648 $this->mAffectedRows = $numrowsinserted;
649
650 // IGNORE always returns true
651 return true;
652 }
653
654 return $res;
655 }
656
657 /**
658 * INSERT SELECT wrapper
659 * $varMap must be an associative array of the form [ 'dest1' => 'source1', ... ]
660 * Source items may be literals rather then field names, but strings should
661 * be quoted with Database::addQuotes()
662 * $conds may be "*" to copy the whole table
663 * srcTable may be an array of tables.
664 * @todo FIXME: Implement this a little better (seperate select/insert)?
665 *
666 * @param string $destTable
667 * @param array|string $srcTable
668 * @param array $varMap
669 * @param array $conds
670 * @param string $fname
671 * @param array $insertOptions
672 * @param array $selectOptions
673 * @return bool
674 */
675 public function nativeInsertSelect(
676 $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__,
677 $insertOptions = [], $selectOptions = []
678 ) {
679 $destTable = $this->tableName( $destTable );
680
681 if ( !is_array( $insertOptions ) ) {
682 $insertOptions = [ $insertOptions ];
683 }
684
685 /*
686 * If IGNORE is set, we use savepoints to emulate mysql's behavior
687 * Ignore LOW PRIORITY option, since it is MySQL-specific
688 */
689 $savepoint = $olde = null;
690 $numrowsinserted = 0;
691 if ( in_array( 'IGNORE', $insertOptions ) ) {
692 $savepoint = new SavepointPostgres( $this, 'mw', $this->queryLogger );
693 $olde = error_reporting( 0 );
694 $savepoint->savepoint();
695 }
696
697 if ( !is_array( $selectOptions ) ) {
698 $selectOptions = [ $selectOptions ];
699 }
700 list( $startOpts, $useIndex, $tailOpts, $ignoreIndex ) =
701 $this->makeSelectOptions( $selectOptions );
702 if ( is_array( $srcTable ) ) {
703 $srcTable = implode( ',', array_map( [ &$this, 'tableName' ], $srcTable ) );
704 } else {
705 $srcTable = $this->tableName( $srcTable );
706 }
707
708 $sql = "INSERT INTO $destTable (" . implode( ',', array_keys( $varMap ) ) . ')' .
709 " SELECT $startOpts " . implode( ',', $varMap ) .
710 " FROM $srcTable $useIndex $ignoreIndex ";
711
712 if ( $conds != '*' ) {
713 $sql .= ' WHERE ' . $this->makeList( $conds, LIST_AND );
714 }
715
716 $sql .= " $tailOpts";
717
718 $res = (bool)$this->query( $sql, $fname, $savepoint );
719 if ( $savepoint ) {
720 $bar = pg_result_error( $this->mLastResult );
721 if ( $bar != false ) {
722 $savepoint->rollback();
723 } else {
724 $savepoint->release();
725 $numrowsinserted++;
726 }
727 error_reporting( $olde );
728 $savepoint->commit();
729
730 // Set the affected row count for the whole operation
731 $this->mAffectedRows = $numrowsinserted;
732
733 // IGNORE always returns true
734 return true;
735 }
736
737 return $res;
738 }
739
740 public function tableName( $name, $format = 'quoted' ) {
741 // Replace reserved words with better ones
742 $name = $this->remappedTableName( $name );
743
744 return parent::tableName( $name, $format );
745 }
746
747 /**
748 * @param string $name
749 * @return string Value of $name or remapped name if $name is a reserved keyword
750 */
751 public function remappedTableName( $name ) {
752 return isset( $this->keywordTableMap[$name] ) ? $this->keywordTableMap[$name] : $name;
753 }
754
755 /**
756 * @param string $name
757 * @param string $format
758 * @return string Qualified and encoded (if requested) table name
759 */
760 public function realTableName( $name, $format = 'quoted' ) {
761 return parent::tableName( $name, $format );
762 }
763
764 public function nextSequenceValue( $seqName ) {
765 $safeseq = str_replace( "'", "''", $seqName );
766 $res = $this->query( "SELECT nextval('$safeseq')" );
767 $row = $this->fetchRow( $res );
768 $this->mInsertId = $row[0];
769
770 return $this->mInsertId;
771 }
772
773 /**
774 * Return the current value of a sequence. Assumes it has been nextval'ed in this session.
775 *
776 * @param string $seqName
777 * @return int
778 */
779 public function currentSequenceValue( $seqName ) {
780 $safeseq = str_replace( "'", "''", $seqName );
781 $res = $this->query( "SELECT currval('$safeseq')" );
782 $row = $this->fetchRow( $res );
783 $currval = $row[0];
784
785 return $currval;
786 }
787
788 public function textFieldSize( $table, $field ) {
789 $table = $this->tableName( $table );
790 $sql = "SELECT t.typname as ftype,a.atttypmod as size
791 FROM pg_class c, pg_attribute a, pg_type t
792 WHERE relname='$table' AND a.attrelid=c.oid AND
793 a.atttypid=t.oid and a.attname='$field'";
794 $res = $this->query( $sql );
795 $row = $this->fetchObject( $res );
796 if ( $row->ftype == 'varchar' ) {
797 $size = $row->size - 4;
798 } else {
799 $size = $row->size;
800 }
801
802 return $size;
803 }
804
805 public function limitResult( $sql, $limit, $offset = false ) {
806 return "$sql LIMIT $limit " . ( is_numeric( $offset ) ? " OFFSET {$offset} " : '' );
807 }
808
809 public function wasDeadlock() {
810 return $this->lastErrno() == '40P01';
811 }
812
813 public function duplicateTableStructure(
814 $oldName, $newName, $temporary = false, $fname = __METHOD__
815 ) {
816 $newName = $this->addIdentifierQuotes( $newName );
817 $oldName = $this->addIdentifierQuotes( $oldName );
818
819 return $this->query( 'CREATE ' . ( $temporary ? 'TEMPORARY ' : '' ) . " TABLE $newName " .
820 "(LIKE $oldName INCLUDING DEFAULTS)", $fname );
821 }
822
823 public function listTables( $prefix = null, $fname = __METHOD__ ) {
824 $eschema = $this->addQuotes( $this->getCoreSchema() );
825 $result = $this->query(
826 "SELECT tablename FROM pg_tables WHERE schemaname = $eschema", $fname );
827 $endArray = [];
828
829 foreach ( $result as $table ) {
830 $vars = get_object_vars( $table );
831 $table = array_pop( $vars );
832 if ( !$prefix || strpos( $table, $prefix ) === 0 ) {
833 $endArray[] = $table;
834 }
835 }
836
837 return $endArray;
838 }
839
840 public function timestamp( $ts = 0 ) {
841 $ct = new ConvertibleTimestamp( $ts );
842
843 return $ct->getTimestamp( TS_POSTGRES );
844 }
845
846 /**
847 * Posted by cc[plus]php[at]c2se[dot]com on 25-Mar-2009 09:12
848 * to https://secure.php.net/manual/en/ref.pgsql.php
849 *
850 * Parsing a postgres array can be a tricky problem, he's my
851 * take on this, it handles multi-dimensional arrays plus
852 * escaping using a nasty regexp to determine the limits of each
853 * data-item.
854 *
855 * This should really be handled by PHP PostgreSQL module
856 *
857 * @since 1.19
858 * @param string $text Postgreql array returned in a text form like {a,b}
859 * @param string $output
860 * @param int|bool $limit
861 * @param int $offset
862 * @return string
863 */
864 private function pg_array_parse( $text, &$output, $limit = false, $offset = 1 ) {
865 if ( false === $limit ) {
866 $limit = strlen( $text ) - 1;
867 $output = [];
868 }
869 if ( '{}' == $text ) {
870 return $output;
871 }
872 do {
873 if ( '{' != $text[$offset] ) {
874 preg_match( "/(\\{?\"([^\"\\\\]|\\\\.)*\"|[^,{}]+)+([,}]+)/",
875 $text, $match, 0, $offset );
876 $offset += strlen( $match[0] );
877 $output[] = ( '"' != $match[1][0]
878 ? $match[1]
879 : stripcslashes( substr( $match[1], 1, -1 ) ) );
880 if ( '},' == $match[3] ) {
881 return $output;
882 }
883 } else {
884 $offset = $this->pg_array_parse( $text, $output, $limit, $offset + 1 );
885 }
886 } while ( $limit > $offset );
887
888 return $output;
889 }
890
891 public function aggregateValue( $valuedata, $valuename = 'value' ) {
892 return $valuedata;
893 }
894
895 public function getSoftwareLink() {
896 return '[{{int:version-db-postgres-url}} PostgreSQL]';
897 }
898
899 /**
900 * Return current schema (executes SELECT current_schema())
901 * Needs transaction
902 *
903 * @since 1.19
904 * @return string Default schema for the current session
905 */
906 public function getCurrentSchema() {
907 $res = $this->query( "SELECT current_schema()", __METHOD__ );
908 $row = $this->fetchRow( $res );
909
910 return $row[0];
911 }
912
913 /**
914 * Return list of schemas which are accessible without schema name
915 * This is list does not contain magic keywords like "$user"
916 * Needs transaction
917 *
918 * @see getSearchPath()
919 * @see setSearchPath()
920 * @since 1.19
921 * @return array List of actual schemas for the current sesson
922 */
923 public function getSchemas() {
924 $res = $this->query( "SELECT current_schemas(false)", __METHOD__ );
925 $row = $this->fetchRow( $res );
926 $schemas = [];
927
928 /* PHP pgsql support does not support array type, "{a,b}" string is returned */
929
930 return $this->pg_array_parse( $row[0], $schemas );
931 }
932
933 /**
934 * Return search patch for schemas
935 * This is different from getSchemas() since it contain magic keywords
936 * (like "$user").
937 * Needs transaction
938 *
939 * @since 1.19
940 * @return array How to search for table names schemas for the current user
941 */
942 public function getSearchPath() {
943 $res = $this->query( "SHOW search_path", __METHOD__ );
944 $row = $this->fetchRow( $res );
945
946 /* PostgreSQL returns SHOW values as strings */
947
948 return explode( ",", $row[0] );
949 }
950
951 /**
952 * Update search_path, values should already be sanitized
953 * Values may contain magic keywords like "$user"
954 * @since 1.19
955 *
956 * @param array $search_path List of schemas to be searched by default
957 */
958 private function setSearchPath( $search_path ) {
959 $this->query( "SET search_path = " . implode( ", ", $search_path ) );
960 }
961
962 /**
963 * Determine default schema for the current application
964 * Adjust this session schema search path if desired schema exists
965 * and is not alread there.
966 *
967 * We need to have name of the core schema stored to be able
968 * to query database metadata.
969 *
970 * This will be also called by the installer after the schema is created
971 *
972 * @since 1.19
973 *
974 * @param string $desiredSchema
975 */
976 public function determineCoreSchema( $desiredSchema ) {
977 $this->begin( __METHOD__, self::TRANSACTION_INTERNAL );
978 if ( $this->schemaExists( $desiredSchema ) ) {
979 if ( in_array( $desiredSchema, $this->getSchemas() ) ) {
980 $this->mCoreSchema = $desiredSchema;
981 $this->queryLogger->debug(
982 "Schema \"" . $desiredSchema . "\" already in the search path\n" );
983 } else {
984 /**
985 * Prepend our schema (e.g. 'mediawiki') in front
986 * of the search path
987 * Fixes bug 15816
988 */
989 $search_path = $this->getSearchPath();
990 array_unshift( $search_path,
991 $this->addIdentifierQuotes( $desiredSchema ) );
992 $this->setSearchPath( $search_path );
993 $this->mCoreSchema = $desiredSchema;
994 $this->queryLogger->debug(
995 "Schema \"" . $desiredSchema . "\" added to the search path\n" );
996 }
997 } else {
998 $this->mCoreSchema = $this->getCurrentSchema();
999 $this->queryLogger->debug(
1000 "Schema \"" . $desiredSchema . "\" not found, using current \"" .
1001 $this->mCoreSchema . "\"\n" );
1002 }
1003 /* Commit SET otherwise it will be rollbacked on error or IGNORE SELECT */
1004 $this->commit( __METHOD__, self::FLUSHING_INTERNAL );
1005 }
1006
1007 /**
1008 * Return schema name for core application tables
1009 *
1010 * @since 1.19
1011 * @return string Core schema name
1012 */
1013 public function getCoreSchema() {
1014 return $this->mCoreSchema;
1015 }
1016
1017 public function getServerVersion() {
1018 if ( !isset( $this->numericVersion ) ) {
1019 $conn = $this->getBindingHandle();
1020 $versionInfo = pg_version( $conn );
1021 if ( version_compare( $versionInfo['client'], '7.4.0', 'lt' ) ) {
1022 // Old client, abort install
1023 $this->numericVersion = '7.3 or earlier';
1024 } elseif ( isset( $versionInfo['server'] ) ) {
1025 // Normal client
1026 $this->numericVersion = $versionInfo['server'];
1027 } else {
1028 // Bug 16937: broken pgsql extension from PHP<5.3
1029 $this->numericVersion = pg_parameter_status( $conn, 'server_version' );
1030 }
1031 }
1032
1033 return $this->numericVersion;
1034 }
1035
1036 /**
1037 * Query whether a given relation exists (in the given schema, or the
1038 * default mw one if not given)
1039 * @param string $table
1040 * @param array|string $types
1041 * @param bool|string $schema
1042 * @return bool
1043 */
1044 private function relationExists( $table, $types, $schema = false ) {
1045 if ( !is_array( $types ) ) {
1046 $types = [ $types ];
1047 }
1048 if ( $schema === false ) {
1049 $schema = $this->getCoreSchema();
1050 }
1051 $etable = $this->addQuotes( $table );
1052 $eschema = $this->addQuotes( $schema );
1053 $sql = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
1054 . "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema "
1055 . "AND c.relkind IN ('" . implode( "','", $types ) . "')";
1056 $res = $this->query( $sql );
1057 $count = $res ? $res->numRows() : 0;
1058
1059 return (bool)$count;
1060 }
1061
1062 /**
1063 * For backward compatibility, this function checks both tables and views.
1064 * @param string $table
1065 * @param string $fname
1066 * @param bool|string $schema
1067 * @return bool
1068 */
1069 public function tableExists( $table, $fname = __METHOD__, $schema = false ) {
1070 return $this->relationExists( $table, [ 'r', 'v' ], $schema );
1071 }
1072
1073 public function sequenceExists( $sequence, $schema = false ) {
1074 return $this->relationExists( $sequence, 'S', $schema );
1075 }
1076
1077 public function triggerExists( $table, $trigger ) {
1078 $q = <<<SQL
1079 SELECT 1 FROM pg_class, pg_namespace, pg_trigger
1080 WHERE relnamespace=pg_namespace.oid AND relkind='r'
1081 AND tgrelid=pg_class.oid
1082 AND nspname=%s AND relname=%s AND tgname=%s
1083 SQL;
1084 $res = $this->query(
1085 sprintf(
1086 $q,
1087 $this->addQuotes( $this->getCoreSchema() ),
1088 $this->addQuotes( $table ),
1089 $this->addQuotes( $trigger )
1090 )
1091 );
1092 if ( !$res ) {
1093 return null;
1094 }
1095 $rows = $res->numRows();
1096
1097 return $rows;
1098 }
1099
1100 public function ruleExists( $table, $rule ) {
1101 $exists = $this->selectField( 'pg_rules', 'rulename',
1102 [
1103 'rulename' => $rule,
1104 'tablename' => $table,
1105 'schemaname' => $this->getCoreSchema()
1106 ]
1107 );
1108
1109 return $exists === $rule;
1110 }
1111
1112 public function constraintExists( $table, $constraint ) {
1113 $sql = sprintf( "SELECT 1 FROM information_schema.table_constraints " .
1114 "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s",
1115 $this->addQuotes( $this->getCoreSchema() ),
1116 $this->addQuotes( $table ),
1117 $this->addQuotes( $constraint )
1118 );
1119 $res = $this->query( $sql );
1120 if ( !$res ) {
1121 return null;
1122 }
1123 $rows = $res->numRows();
1124
1125 return $rows;
1126 }
1127
1128 /**
1129 * Query whether a given schema exists. Returns true if it does, false if it doesn't.
1130 * @param string $schema
1131 * @return bool
1132 */
1133 public function schemaExists( $schema ) {
1134 if ( !strlen( $schema ) ) {
1135 return false; // short-circuit
1136 }
1137
1138 $exists = $this->selectField(
1139 '"pg_catalog"."pg_namespace"', 1, [ 'nspname' => $schema ], __METHOD__ );
1140
1141 return (bool)$exists;
1142 }
1143
1144 /**
1145 * Returns true if a given role (i.e. user) exists, false otherwise.
1146 * @param string $roleName
1147 * @return bool
1148 */
1149 public function roleExists( $roleName ) {
1150 $exists = $this->selectField( '"pg_catalog"."pg_roles"', 1,
1151 [ 'rolname' => $roleName ], __METHOD__ );
1152
1153 return (bool)$exists;
1154 }
1155
1156 /**
1157 * @var string $table
1158 * @var string $field
1159 * @return PostgresField|null
1160 */
1161 public function fieldInfo( $table, $field ) {
1162 return PostgresField::fromText( $this, $table, $field );
1163 }
1164
1165 /**
1166 * pg_field_type() wrapper
1167 * @param ResultWrapper|resource $res ResultWrapper or PostgreSQL query result resource
1168 * @param int $index Field number, starting from 0
1169 * @return string
1170 */
1171 public function fieldType( $res, $index ) {
1172 if ( $res instanceof ResultWrapper ) {
1173 $res = $res->result;
1174 }
1175
1176 return pg_field_type( $res, $index );
1177 }
1178
1179 public function encodeBlob( $b ) {
1180 return new PostgresBlob( pg_escape_bytea( $b ) );
1181 }
1182
1183 public function decodeBlob( $b ) {
1184 if ( $b instanceof PostgresBlob ) {
1185 $b = $b->fetch();
1186 } elseif ( $b instanceof Blob ) {
1187 return $b->fetch();
1188 }
1189
1190 return pg_unescape_bytea( $b );
1191 }
1192
1193 public function strencode( $s ) {
1194 // Should not be called by us
1195 return pg_escape_string( $this->getBindingHandle(), $s );
1196 }
1197
1198 public function addQuotes( $s ) {
1199 $conn = $this->getBindingHandle();
1200
1201 if ( is_null( $s ) ) {
1202 return 'NULL';
1203 } elseif ( is_bool( $s ) ) {
1204 return intval( $s );
1205 } elseif ( $s instanceof Blob ) {
1206 if ( $s instanceof PostgresBlob ) {
1207 $s = $s->fetch();
1208 } else {
1209 $s = pg_escape_bytea( $conn, $s->fetch() );
1210 }
1211 return "'$s'";
1212 }
1213
1214 return "'" . pg_escape_string( $conn, $s ) . "'";
1215 }
1216
1217 /**
1218 * Postgres specific version of replaceVars.
1219 * Calls the parent version in Database.php
1220 *
1221 * @param string $ins SQL string, read from a stream (usually tables.sql)
1222 * @return string SQL string
1223 */
1224 protected function replaceVars( $ins ) {
1225 $ins = parent::replaceVars( $ins );
1226
1227 if ( $this->numericVersion >= 8.3 ) {
1228 // Thanks for not providing backwards-compatibility, 8.3
1229 $ins = preg_replace( "/to_tsvector\s*\(\s*'default'\s*,/", 'to_tsvector(', $ins );
1230 }
1231
1232 if ( $this->numericVersion <= 8.1 ) { // Our minimum version
1233 $ins = str_replace( 'USING gin', 'USING gist', $ins );
1234 }
1235
1236 return $ins;
1237 }
1238
1239 public function makeSelectOptions( $options ) {
1240 $preLimitTail = $postLimitTail = '';
1241 $startOpts = $useIndex = $ignoreIndex = '';
1242
1243 $noKeyOptions = [];
1244 foreach ( $options as $key => $option ) {
1245 if ( is_numeric( $key ) ) {
1246 $noKeyOptions[$option] = true;
1247 }
1248 }
1249
1250 $preLimitTail .= $this->makeGroupByWithHaving( $options );
1251
1252 $preLimitTail .= $this->makeOrderBy( $options );
1253
1254 // if ( isset( $options['LIMIT'] ) ) {
1255 // $tailOpts .= $this->limitResult( '', $options['LIMIT'],
1256 // isset( $options['OFFSET'] ) ? $options['OFFSET']
1257 // : false );
1258 // }
1259
1260 if ( isset( $options['FOR UPDATE'] ) ) {
1261 $postLimitTail .= ' FOR UPDATE OF ' .
1262 implode( ', ', array_map( [ &$this, 'tableName' ], $options['FOR UPDATE'] ) );
1263 } elseif ( isset( $noKeyOptions['FOR UPDATE'] ) ) {
1264 $postLimitTail .= ' FOR UPDATE';
1265 }
1266
1267 if ( isset( $noKeyOptions['DISTINCT'] ) || isset( $noKeyOptions['DISTINCTROW'] ) ) {
1268 $startOpts .= 'DISTINCT';
1269 }
1270
1271 return [ $startOpts, $useIndex, $preLimitTail, $postLimitTail, $ignoreIndex ];
1272 }
1273
1274 public function getDBname() {
1275 return $this->mDBname;
1276 }
1277
1278 public function getServer() {
1279 return $this->mServer;
1280 }
1281
1282 public function buildConcat( $stringList ) {
1283 return implode( ' || ', $stringList );
1284 }
1285
1286 public function buildGroupConcatField(
1287 $delimiter, $table, $field, $conds = '', $options = [], $join_conds = []
1288 ) {
1289 $fld = "array_to_string(array_agg($field)," . $this->addQuotes( $delimiter ) . ')';
1290
1291 return '(' . $this->selectSQLText( $table, $fld, $conds, null, [], $join_conds ) . ')';
1292 }
1293
1294 public function buildStringCast( $field ) {
1295 return $field . '::text';
1296 }
1297
1298 public function streamStatementEnd( &$sql, &$newLine ) {
1299 # Allow dollar quoting for function declarations
1300 if ( substr( $newLine, 0, 4 ) == '$mw$' ) {
1301 if ( $this->delimiter ) {
1302 $this->delimiter = false;
1303 } else {
1304 $this->delimiter = ';';
1305 }
1306 }
1307
1308 return parent::streamStatementEnd( $sql, $newLine );
1309 }
1310
1311 public function lockIsFree( $lockName, $method ) {
1312 // http://www.postgresql.org/docs/8.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
1313 $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
1314 $result = $this->query( "SELECT (CASE(pg_try_advisory_lock($key))
1315 WHEN 'f' THEN 'f' ELSE pg_advisory_unlock($key) END) AS lockstatus", $method );
1316 $row = $this->fetchObject( $result );
1317
1318 return ( $row->lockstatus === 't' );
1319 }
1320
1321 public function lock( $lockName, $method, $timeout = 5 ) {
1322 // http://www.postgresql.org/docs/8.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
1323 $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
1324 $loop = new WaitConditionLoop(
1325 function () use ( $lockName, $key, $timeout, $method ) {
1326 $res = $this->query( "SELECT pg_try_advisory_lock($key) AS lockstatus", $method );
1327 $row = $this->fetchObject( $res );
1328 if ( $row->lockstatus === 't' ) {
1329 parent::lock( $lockName, $method, $timeout ); // record
1330 return true;
1331 }
1332
1333 return WaitConditionLoop::CONDITION_CONTINUE;
1334 },
1335 $timeout
1336 );
1337
1338 return ( $loop->invoke() === $loop::CONDITION_REACHED );
1339 }
1340
1341 public function unlock( $lockName, $method ) {
1342 // http://www.postgresql.org/docs/8.2/static/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
1343 $key = $this->addQuotes( $this->bigintFromLockName( $lockName ) );
1344 $result = $this->query( "SELECT pg_advisory_unlock($key) as lockstatus", $method );
1345 $row = $this->fetchObject( $result );
1346
1347 if ( $row->lockstatus === 't' ) {
1348 parent::unlock( $lockName, $method ); // record
1349 return true;
1350 }
1351
1352 $this->queryLogger->debug( __METHOD__ . " failed to release lock\n" );
1353
1354 return false;
1355 }
1356
1357 /**
1358 * @param string $lockName
1359 * @return string Integer
1360 */
1361 private function bigintFromLockName( $lockName ) {
1362 return Wikimedia\base_convert( substr( sha1( $lockName ), 0, 15 ), 16, 10 );
1363 }
1364 }