d945e1d814694479cb52eb89c237e3b0ed327784
[lhc/web/wiklou.git] / includes / DatabasePostgres.php
1 <?php
2
3 /**
4 * This is the Postgres database abstraction layer.
5 *
6 * As it includes more generic version for DB functions,
7 * than MySQL ones, some of them should be moved to parent
8 * Database class.
9 *
10 */
11
12 class DatabasePostgres extends Database {
13 var $mInsertId = NULL;
14 var $mLastResult = NULL;
15 var $numeric_version = NULL;
16
17 function DatabasePostgres($server = false, $user = false, $password = false, $dbName = false,
18 $failFunction = false, $flags = 0 )
19 {
20
21 global $wgOut;
22 # Can't get a reference if it hasn't been set yet
23 if ( !isset( $wgOut ) ) {
24 $wgOut = NULL;
25 }
26 $this->mOut =& $wgOut;
27 $this->mFailFunction = $failFunction;
28 $this->mCascadingDeletes = true;
29 $this->mCleanupTriggers = true;
30 $this->mStrictIPs = true;
31 $this->mFlags = $flags;
32 $this->open( $server, $user, $password, $dbName);
33
34 }
35
36 function realTimestamps() {
37 return true;
38 }
39
40 function implicitGroupby() {
41 return false;
42 }
43
44 function searchableIPs() {
45 return true;
46 }
47
48 static function newFromParams( $server = false, $user = false, $password = false, $dbName = false,
49 $failFunction = false, $flags = 0)
50 {
51 return new DatabasePostgres( $server, $user, $password, $dbName, $failFunction, $flags );
52 }
53
54 /**
55 * Usually aborts on failure
56 * If the failFunction is set to a non-zero integer, returns success
57 */
58 function open( $server, $user, $password, $dbName ) {
59 # Test for Postgres support, to avoid suppressed fatal error
60 if ( !function_exists( 'pg_connect' ) ) {
61 throw new DBConnectionError( $this, "Postgres functions missing, have you compiled PHP with the --with-pgsql option?\n (Note: if you recently installed PHP, you may need to restart your webserver and database)\n" );
62 }
63
64
65 global $wgDBport;
66
67 $this->close();
68 $this->mServer = $server;
69 $port = $wgDBport;
70 $this->mUser = $user;
71 $this->mPassword = $password;
72 $this->mDBname = $dbName;
73
74 $hstring="";
75 if ($server!=false && $server!="") {
76 $hstring="host=$server ";
77 }
78 if ($port!=false && $port!="") {
79 $hstring .= "port=$port ";
80 }
81
82 if (!strlen($user)) { ## e.g. the class is being loaded
83 return;
84 }
85
86 error_reporting( E_ALL );
87 @$this->mConn = pg_connect("$hstring dbname=$dbName user=$user password=$password");
88
89 if ( $this->mConn == false ) {
90 wfDebug( "DB connection error\n" );
91 wfDebug( "Server: $server, Database: $dbName, User: $user, Password: " . substr( $password, 0, 3 ) . "...\n" );
92 wfDebug( $this->lastError()."\n" );
93 return false;
94 }
95
96 $this->mOpened = true;
97 ## If this is the initial connection, setup the schema stuff and possibly create the user
98 if (defined('MEDIAWIKI_INSTALL')) {
99 global $wgDBname, $wgDBuser, $wgDBpassword, $wgDBsuperuser, $wgDBmwschema,
100 $wgDBts2schema;
101
102 print "<li>Checking the version of Postgres...";
103 $version = $this->getServerVersion();
104 $PGMINVER = "8.1";
105 if ($this->numeric_version < $PGMINVER) {
106 print "<b>FAILED</b>. Required version is $PGMINVER. You have $this->numeric_version ($version)</li>\n";
107 dieout("</ul>");
108 }
109 print "version $this->numeric_version is OK.</li>\n";
110
111 $safeuser = $this->quote_ident($wgDBuser);
112 ## Are we connecting as a superuser for the first time?
113 if ($wgDBsuperuser) {
114 ## Are we really a superuser? Check out our rights
115 $SQL = "SELECT
116 CASE WHEN usesuper IS TRUE THEN
117 CASE WHEN usecreatedb IS TRUE THEN 3 ELSE 1 END
118 ELSE CASE WHEN usecreatedb IS TRUE THEN 2 ELSE 0 END
119 END AS rights
120 FROM pg_catalog.pg_user WHERE usename = " . $this->addQuotes($wgDBsuperuser);
121 $rows = $this->numRows($res = $this->doQuery($SQL));
122 if (!$rows) {
123 print "<li>ERROR: Could not read permissions for user \"$wgDBsuperuser\"</li>\n";
124 dieout('</ul>');
125 }
126 $perms = pg_fetch_result($res, 0, 0);
127
128 $SQL = "SELECT 1 FROM pg_catalog.pg_user WHERE usename = " . $this->addQuotes($wgDBuser);
129 $rows = $this->numRows($this->doQuery($SQL));
130 if ($rows) {
131 print "<li>User \"$wgDBuser\" already exists, skipping account creation.</li>";
132 }
133 else {
134 if ($perms != 1 and $perms != 3) {
135 print "<li>ERROR: the user \"$wgDBsuperuser\" cannot create other users. ";
136 print 'Please use a different Postgres user.</li>';
137 dieout('</ul>');
138 }
139 print "<li>Creating user <b>$wgDBuser</b>...";
140 $safepass = $this->addQuotes($wgDBpassword);
141 $SQL = "CREATE USER $safeuser NOCREATEDB PASSWORD $safepass";
142 $this->doQuery($SQL);
143 print "OK</li>\n";
144 }
145 ## User now exists, check out the database
146 if ($dbName != $wgDBname) {
147 $SQL = "SELECT 1 FROM pg_catalog.pg_database WHERE datname = " . $this->addQuotes($wgDBname);
148 $rows = $this->numRows($this->doQuery($SQL));
149 if ($rows) {
150 print "<li>Database \"$wgDBname\" already exists, skipping database creation.</li>";
151 }
152 else {
153 if ($perms < 2) {
154 print "<li>ERROR: the user \"$wgDBsuperuser\" cannot create databases. ";
155 print 'Please use a different Postgres user.</li>';
156 dieout('</ul>');
157 }
158 print "<li>Creating database <b>$wgDBname</b>...";
159 $safename = $this->quote_ident($wgDBname);
160 $SQL = "CREATE DATABASE $safename OWNER $safeuser ";
161 $this->doQuery($SQL);
162 print "OK</li>\n";
163 ## Hopefully tsearch2 and plpgsql are in template1...
164 }
165
166 ## Reconnect to check out tsearch2 rights for this user
167 print "<li>Connecting to \"$wgDBname\" as superuser \"$wgDBsuperuser\" to check rights...";
168 @$this->mConn = pg_connect("$hstring dbname=$wgDBname user=$user password=$password");
169 if ( $this->mConn == false ) {
170 print "<b>FAILED TO CONNECT!</b></li>";
171 dieout("</ul>");
172 }
173 print "OK</li>\n";
174 }
175
176 ## Tsearch2 checks
177 print "<li>Checking that tsearch2 is installed in the database \"$wgDBname\"...";
178 if (! $this->tableExists("pg_ts_cfg", $wgDBts2schema)) {
179 print "<b>FAILED</b>. tsearch2 must be installed in the database \"$wgDBname\".";
180 print "Please see <a href='http://www.devx.com/opensource/Article/21674/0/page/2'>this article</a>";
181 print " for instructions or ask on #postgresql on irc.freenode.net</li>\n";
182 dieout("</ul>");
183 }
184 print "OK</li>\n";
185 print "<li>Ensuring that user \"$wgDBuser\" has select rights on the tsearch2 tables...";
186 foreach (array('cfg','cfgmap','dict','parser') as $table) {
187 $SQL = "GRANT SELECT ON pg_ts_$table TO $safeuser";
188 $this->doQuery($SQL);
189 }
190 print "OK</li>\n";
191
192
193 ## Setup the schema for this user if needed
194 $result = $this->schemaExists($wgDBmwschema);
195 $safeschema = $this->quote_ident($wgDBmwschema);
196 if (!$result) {
197 print "<li>Creating schema <b>$wgDBmwschema</b> ...";
198 $result = $this->doQuery("CREATE SCHEMA $safeschema AUTHORIZATION $safeuser");
199 if (!$result) {
200 print "<b>FAILED</b>.</li>\n";
201 dieout("</ul>");
202 }
203 print "OK</li>\n";
204 }
205 else {
206 print "<li>Schema already exists, explicitly granting rights...\n";
207 $safeschema2 = $this->addQuotes($wgDBmwschema);
208 $SQL = "SELECT 'GRANT ALL ON '||pg_catalog.quote_ident(relname)||' TO $safeuser;'\n".
209 "FROM pg_catalog.pg_class p, pg_catalog.pg_namespace n\n".
210 "WHERE relnamespace = n.oid AND n.nspname = $safeschema2\n".
211 "AND p.relkind IN ('r','S','v')\n";
212 $SQL .= "UNION\n";
213 $SQL .= "SELECT 'GRANT ALL ON FUNCTION '||pg_catalog.quote_ident(proname)||'('||\n".
214 "pg_catalog.oidvectortypes(p.proargtypes)||') TO $safeuser;'\n".
215 "FROM pg_catalog.pg_proc p, pg_catalog.pg_namespace n\n".
216 "WHERE p.pronamespace = n.oid AND n.nspname = $safeschema2";
217 $res = $this->doQuery($SQL);
218 if (!$res) {
219 print "<b>FAILED</b>. Could not set rights for the user.</li>\n";
220 dieout("</ul>");
221 }
222 $this->doQuery("SET search_path = $safeschema");
223 $rows = $this->numRows($res);
224 while ($rows) {
225 $rows--;
226 $this->doQuery(pg_fetch_result($res, $rows, 0));
227 }
228 print "OK</li>";
229 }
230
231 $wgDBsuperuser = '';
232 return true; ## Reconnect as regular user
233 }
234
235 if (!defined('POSTGRES_SEARCHPATH')) {
236
237 ## Do we have the basic tsearch2 table?
238 print "<li>Checking for tsearch2 in the schema \"$wgDBts2schema\"...";
239 if (! $this->tableExists("pg_ts_dict", $wgDBts2schema)) {
240 print "<b>FAILED</b>. Make sure tsearch2 is installed. See <a href=";
241 print "'http://www.devx.com/opensource/Article/21674/0/page/2'>this article</a>";
242 print " for instructions.</li>\n";
243 dieout("</ul>");
244 }
245 print "OK</li>\n";
246
247 ## Does this user have the rights to the tsearch2 tables?
248 $ctype = pg_fetch_result($this->doQuery("SHOW lc_ctype"),0,0);
249 print "<li>Checking tsearch2 permissions...";
250 ## Let's check all four, just to be safe
251 error_reporting( 0 );
252 $ts2tables = array('cfg','cfgmap','dict','parser');
253 foreach ( $ts2tables AS $tname ) {
254 $SQL = "SELECT count(*) FROM $wgDBts2schema.pg_ts_$tname";
255 $res = $this->doQuery($SQL);
256 if (!$res) {
257 print "<b>FAILED</b> to access pg_ts_$tname. Make sure that the user ".
258 "\"$wgDBuser\" has SELECT access to all four tsearch2 tables</li>\n";
259 dieout("</ul>");
260 }
261 }
262 $SQL = "SELECT ts_name FROM $wgDBts2schema.pg_ts_cfg WHERE locale = '$ctype'";
263 $SQL .= " ORDER BY CASE WHEN ts_name <> 'default' THEN 1 ELSE 0 END";
264 $res = $this->doQuery($SQL);
265 error_reporting( E_ALL );
266 if (!$res) {
267 print "<b>FAILED</b>. Could not determine the tsearch2 locale information</li>\n";
268 dieout("</ul>");
269 }
270 print "OK</li>";
271
272 ## Will the current locale work? Can we force it to?
273 print "<li>Verifying tsearch2 locale with $ctype...";
274 $rows = $this->numRows($res);
275 $resetlocale = 0;
276 if (!$rows) {
277 print "<b>not found</b></li>\n";
278 print "<li>Attempting to set default tsearch2 locale to \"$ctype\"...";
279 $resetlocale = 1;
280 }
281 else {
282 $tsname = pg_fetch_result($res, 0, 0);
283 if ($tsname != 'default') {
284 print "<b>not set to default ($tsname)</b>";
285 print "<li>Attempting to change tsearch2 default locale to \"$ctype\"...";
286 $resetlocale = 1;
287 }
288 }
289 if ($resetlocale) {
290 $SQL = "UPDATE $wgDBts2schema.pg_ts_cfg SET locale = '$ctype' WHERE ts_name = 'default'";
291 $res = $this->doQuery($SQL);
292 if (!$res) {
293 print "<b>FAILED</b>. ";
294 print "Please make sure that the locale in pg_ts_cfg for \"default\" is set to \"$ctype\"</li>\n";
295 dieout("</ul>");
296 }
297 print "OK</li>";
298 }
299
300 ## Final test: try out a simple tsearch2 query
301 $SQL = "SELECT $wgDBts2schema.to_tsvector('default','MediaWiki tsearch2 testing')";
302 $res = $this->doQuery($SQL);
303 if (!$res) {
304 print "<b>FAILED</b>. Specifically, \"$SQL\" did not work.</li>";
305 dieout("</ul>");
306 }
307 print "OK</li>";
308
309 ## Do we have plpgsql installed?
310 print "<li>Checking for Pl/Pgsql ...";
311 $SQL = "SELECT 1 FROM pg_catalog.pg_language WHERE lanname = 'plpgsql'";
312 $rows = $this->numRows($this->doQuery($SQL));
313 if ($rows < 1) {
314 // plpgsql is not installed, but if we have a pg_pltemplate table, we should be able to create it
315 print "not installed. Attempting to install Pl/Pgsql ...";
316 $SQL = "SELECT 1 FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (n.oid = c.relnamespace) ".
317 "WHERE relname = 'pg_pltemplate' AND nspname='pg_catalog'";
318 $rows = $this->numRows($this->doQuery($SQL));
319 if ($rows >= 1) {
320 $result = $this->doQuery("CREATE LANGUAGE plpgsql");
321 if (!$result) {
322 print "<b>FAILED</b>. You need to install the language plpgsql in the database <tt>$wgDBname</tt></li>";
323 dieout("</ul>");
324 }
325 }
326 else {
327 print "<b>FAILED</b>. You need to install the language plpgsql in the database <tt>$wgDBname</tt></li>";
328 dieout("</ul>");
329 }
330 }
331 print "OK</li>\n";
332
333 ## Does the schema already exist? Who owns it?
334 $result = $this->schemaExists($wgDBmwschema);
335 if (!$result) {
336 print "<li>Creating schema <b>$wgDBmwschema</b> ...";
337 error_reporting( 0 );
338 $result = $this->doQuery("CREATE SCHEMA $wgDBmwschema");
339 error_reporting( E_ALL );
340 if (!$result) {
341 print "<b>FAILED</b>. The user \"$wgDBuser\" must be able to access the schema. ".
342 "You can try making them the owner of the database, or try creating the schema with a ".
343 "different user, and then grant access to the \"$wgDBuser\" user.</li>\n";
344 dieout("</ul>");
345 }
346 print "OK</li>\n";
347 }
348 else if ($result != $user) {
349 print "<li>Schema \"$wgDBmwschema\" exists but is not owned by \"$user\". Not ideal.</li>\n";
350 }
351 else {
352 print "<li>Schema \"$wgDBmwschema\" exists and is owned by \"$user\". Excellent.</li>\n";
353 }
354
355 ## Fix up the search paths if needed
356 print "<li>Setting the search path for user \"$user\" ...";
357 $path = $this->quote_ident($wgDBmwschema);
358 if ($wgDBts2schema !== $wgDBmwschema)
359 $path .= ", ". $this->quote_ident($wgDBts2schema);
360 if ($wgDBmwschema !== 'public' and $wgDBts2schema !== 'public')
361 $path .= ", public";
362 $SQL = "ALTER USER $safeuser SET search_path = $path";
363 $result = pg_query($this->mConn, $SQL);
364 if (!$result) {
365 print "<b>FAILED</b>.</li>\n";
366 dieout("</ul>");
367 }
368 print "OK</li>\n";
369 ## Set for the rest of this session
370 $SQL = "SET search_path = $path";
371 $result = pg_query($this->mConn, $SQL);
372 if (!$result) {
373 print "<li>Failed to set search_path</li>\n";
374 dieout("</ul>");
375 }
376 define( "POSTGRES_SEARCHPATH", $path );
377 }}
378
379 global $wgCommandLineMode;
380 ## If called from the command-line (e.g. importDump), only show errors
381 if ($wgCommandLineMode) {
382 $this->doQuery("SET client_min_messages = 'ERROR'");
383 }
384
385 return $this->mConn;
386 }
387
388 /**
389 * Closes a database connection, if it is open
390 * Returns success, true if already closed
391 */
392 function close() {
393 $this->mOpened = false;
394 if ( $this->mConn ) {
395 return pg_close( $this->mConn );
396 } else {
397 return true;
398 }
399 }
400
401 function doQuery( $sql ) {
402 return $this->mLastResult=pg_query( $this->mConn , $sql);
403 }
404
405 function queryIgnore( $sql, $fname = '' ) {
406 return $this->query( $sql, $fname, true );
407 }
408
409 function freeResult( $res ) {
410 if ( !@pg_free_result( $res ) ) {
411 throw new DBUnexpectedError($this, "Unable to free Postgres result\n" );
412 }
413 }
414
415 function fetchObject( $res ) {
416 @$row = pg_fetch_object( $res );
417 # FIXME: HACK HACK HACK HACK debug
418
419 # TODO:
420 # hashar : not sure if the following test really trigger if the object
421 # fetching failled.
422 if( pg_last_error($this->mConn) ) {
423 throw new DBUnexpectedError($this, 'SQL error: ' . htmlspecialchars( pg_last_error($this->mConn) ) );
424 }
425 return $row;
426 }
427
428 function fetchRow( $res ) {
429 @$row = pg_fetch_array( $res );
430 if( pg_last_error($this->mConn) ) {
431 throw new DBUnexpectedError($this, 'SQL error: ' . htmlspecialchars( pg_last_error($this->mConn) ) );
432 }
433 return $row;
434 }
435
436 function numRows( $res ) {
437 @$n = pg_num_rows( $res );
438 if( pg_last_error($this->mConn) ) {
439 throw new DBUnexpectedError($this, 'SQL error: ' . htmlspecialchars( pg_last_error($this->mConn) ) );
440 }
441 return $n;
442 }
443 function numFields( $res ) { return pg_num_fields( $res ); }
444 function fieldName( $res, $n ) { return pg_field_name( $res, $n ); }
445
446 /**
447 * This must be called after nextSequenceVal
448 */
449 function insertId() {
450 return $this->mInsertId;
451 }
452
453 function dataSeek( $res, $row ) { return pg_result_seek( $res, $row ); }
454 function lastError() {
455 if ( $this->mConn ) {
456 return pg_last_error();
457 }
458 else {
459 return "No database connection";
460 }
461 }
462 function lastErrno() {
463 return pg_last_error() ? 1 : 0;
464 }
465
466 function affectedRows() {
467 return pg_affected_rows( $this->mLastResult );
468 }
469
470 /**
471 * Returns information about an index
472 * If errors are explicitly ignored, returns NULL on failure
473 */
474 function indexInfo( $table, $index, $fname = 'Database::indexExists' ) {
475 $sql = "SELECT indexname FROM pg_indexes WHERE tablename='$table'";
476 $res = $this->query( $sql, $fname );
477 if ( !$res ) {
478 return NULL;
479 }
480
481 while ( $row = $this->fetchObject( $res ) ) {
482 if ( $row->indexname == $index ) {
483 return $row;
484
485 // BUG: !!!! This code needs to be synced up with database.php
486
487 }
488 }
489 return false;
490 }
491
492 function indexUnique ($table, $index, $fname = 'Database::indexUnique' ) {
493 $sql = "SELECT indexname FROM pg_indexes WHERE tablename='{$table}'".
494 " AND indexdef LIKE 'CREATE UNIQUE%({$index})'";
495 $res = $this->query( $sql, $fname );
496 if ( !$res )
497 return NULL;
498 while ($row = $this->fetchObject( $res ))
499 return true;
500 return false;
501
502 }
503
504 function insert( $table, $a, $fname = 'Database::insert', $options = array() ) {
505 # Postgres doesn't support options
506 # We have a go at faking one of them
507 # TODO: DELAYED, LOW_PRIORITY
508
509 if ( !is_array($options))
510 $options = array($options);
511
512 if ( in_array( 'IGNORE', $options ) )
513 $oldIgnore = $this->ignoreErrors( true );
514
515 # IGNORE is performed using single-row inserts, ignoring errors in each
516 # FIXME: need some way to distiguish between key collision and other types of error
517 $oldIgnore = $this->ignoreErrors( true );
518 if ( !is_array( reset( $a ) ) ) {
519 $a = array( $a );
520 }
521 foreach ( $a as $row ) {
522 parent::insert( $table, $row, $fname, array() );
523 }
524 $this->ignoreErrors( $oldIgnore );
525 $retVal = true;
526
527 if ( in_array( 'IGNORE', $options ) )
528 $this->ignoreErrors( $oldIgnore );
529
530 return $retVal;
531 }
532
533 function tableName( $name ) {
534 # Replace reserved words with better ones
535 switch( $name ) {
536 case 'user':
537 return 'mwuser';
538 case 'text':
539 return 'pagecontent';
540 default:
541 return $name;
542 }
543 }
544
545 /**
546 * Return the next in a sequence, save the value for retrieval via insertId()
547 */
548 function nextSequenceValue( $seqName ) {
549 $safeseq = preg_replace( "/'/", "''", $seqName );
550 $res = $this->query( "SELECT nextval('$safeseq')" );
551 $row = $this->fetchRow( $res );
552 $this->mInsertId = $row[0];
553 $this->freeResult( $res );
554 return $this->mInsertId;
555 }
556
557 /**
558 * Postgres does not have a "USE INDEX" clause, so return an empty string
559 */
560 function useIndexClause( $index ) {
561 return '';
562 }
563
564 # REPLACE query wrapper
565 # Postgres simulates this with a DELETE followed by INSERT
566 # $row is the row to insert, an associative array
567 # $uniqueIndexes is an array of indexes. Each element may be either a
568 # field name or an array of field names
569 #
570 # It may be more efficient to leave off unique indexes which are unlikely to collide.
571 # However if you do this, you run the risk of encountering errors which wouldn't have
572 # occurred in MySQL
573 function replace( $table, $uniqueIndexes, $rows, $fname = 'Database::replace' ) {
574 $table = $this->tableName( $table );
575
576 if (count($rows)==0) {
577 return;
578 }
579
580 # Single row case
581 if ( !is_array( reset( $rows ) ) ) {
582 $rows = array( $rows );
583 }
584
585 foreach( $rows as $row ) {
586 # Delete rows which collide
587 if ( $uniqueIndexes ) {
588 $sql = "DELETE FROM $table WHERE ";
589 $first = true;
590 foreach ( $uniqueIndexes as $index ) {
591 if ( $first ) {
592 $first = false;
593 $sql .= "(";
594 } else {
595 $sql .= ') OR (';
596 }
597 if ( is_array( $index ) ) {
598 $first2 = true;
599 foreach ( $index as $col ) {
600 if ( $first2 ) {
601 $first2 = false;
602 } else {
603 $sql .= ' AND ';
604 }
605 $sql .= $col.'=' . $this->addQuotes( $row[$col] );
606 }
607 } else {
608 $sql .= $index.'=' . $this->addQuotes( $row[$index] );
609 }
610 }
611 $sql .= ')';
612 $this->query( $sql, $fname );
613 }
614
615 # Now insert the row
616 $sql = "INSERT INTO $table (" . $this->makeList( array_keys( $row ), LIST_NAMES ) .') VALUES (' .
617 $this->makeList( $row, LIST_COMMA ) . ')';
618 $this->query( $sql, $fname );
619 }
620 }
621
622 # DELETE where the condition is a join
623 function deleteJoin( $delTable, $joinTable, $delVar, $joinVar, $conds, $fname = "Database::deleteJoin" ) {
624 if ( !$conds ) {
625 throw new DBUnexpectedError($this, 'Database::deleteJoin() called with empty $conds' );
626 }
627
628 $delTable = $this->tableName( $delTable );
629 $joinTable = $this->tableName( $joinTable );
630 $sql = "DELETE FROM $delTable WHERE $delVar IN (SELECT $joinVar FROM $joinTable ";
631 if ( $conds != '*' ) {
632 $sql .= 'WHERE ' . $this->makeList( $conds, LIST_AND );
633 }
634 $sql .= ')';
635
636 $this->query( $sql, $fname );
637 }
638
639 # Returns the size of a text field, or -1 for "unlimited"
640 function textFieldSize( $table, $field ) {
641 $table = $this->tableName( $table );
642 $sql = "SELECT t.typname as ftype,a.atttypmod as size
643 FROM pg_class c, pg_attribute a, pg_type t
644 WHERE relname='$table' AND a.attrelid=c.oid AND
645 a.atttypid=t.oid and a.attname='$field'";
646 $res =$this->query($sql);
647 $row=$this->fetchObject($res);
648 if ($row->ftype=="varchar") {
649 $size=$row->size-4;
650 } else {
651 $size=$row->size;
652 }
653 $this->freeResult( $res );
654 return $size;
655 }
656
657 function lowPriorityOption() {
658 return '';
659 }
660
661 function limitResult($sql, $limit,$offset) {
662 return "$sql LIMIT $limit ".(is_numeric($offset)?" OFFSET {$offset} ":"");
663 }
664
665 /**
666 * Returns an SQL expression for a simple conditional.
667 * Uses CASE on Postgres
668 *
669 * @param string $cond SQL expression which will result in a boolean value
670 * @param string $trueVal SQL expression to return if true
671 * @param string $falseVal SQL expression to return if false
672 * @return string SQL fragment
673 */
674 function conditional( $cond, $trueVal, $falseVal ) {
675 return " (CASE WHEN $cond THEN $trueVal ELSE $falseVal END) ";
676 }
677
678 function wasDeadlock() {
679 return $this->lastErrno() == '40P01';
680 }
681
682 function timestamp( $ts=0 ) {
683 return wfTimestamp(TS_POSTGRES,$ts);
684 }
685
686 /**
687 * Return aggregated value function call
688 */
689 function aggregateValue ($valuedata,$valuename='value') {
690 return $valuedata;
691 }
692
693
694 function reportQueryError( $error, $errno, $sql, $fname, $tempIgnore = false ) {
695 # Ignore errors during error handling to avoid infinite recursion
696 $ignore = $this->ignoreErrors( true );
697 ++$this->mErrorCount;
698
699 if ($ignore || $tempIgnore) {
700 wfDebug("SQL ERROR (ignored): $error\n");
701 $this->ignoreErrors( $ignore );
702 }
703 else {
704 $message = "A database error has occurred\n" .
705 "Query: $sql\n" .
706 "Function: $fname\n" .
707 "Error: $errno $error\n";
708 throw new DBUnexpectedError($this, $message);
709 }
710 }
711
712 /**
713 * @return string wikitext of a link to the server software's web site
714 */
715 function getSoftwareLink() {
716 return "[http://www.postgresql.org/ PostgreSQL]";
717 }
718
719 /**
720 * @return string Version information from the database
721 */
722 function getServerVersion() {
723 $version = pg_fetch_result($this->doQuery("SELECT version()"),0,0);
724 $thisver = array();
725 if (!preg_match('/PostgreSQL (\d+\.\d+)(\S+)/', $version, $thisver)) {
726 die("Could not determine the numeric version from $version!");
727 }
728 $this->numeric_version = $thisver[1];
729 return $version;
730 }
731
732
733 /**
734 * Query whether a given table exists (in the given schema, or the default mw one if not given)
735 */
736 function tableExists( $table, $schema = false ) {
737 global $wgDBmwschema;
738 if (! $schema )
739 $schema = $wgDBmwschema;
740 $etable = preg_replace("/'/", "''", $table);
741 $eschema = preg_replace("/'/", "''", $schema);
742 $SQL = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n "
743 . "WHERE c.relnamespace = n.oid AND c.relname = '$etable' AND n.nspname = '$eschema' "
744 . "AND c.relkind IN ('r','v')";
745 $res = $this->query( $SQL );
746 $count = $res ? pg_num_rows($res) : 0;
747 if ($res)
748 $this->freeResult( $res );
749 return $count;
750 }
751
752
753 /**
754 * Query whether a given schema exists. Returns the name of the owner
755 */
756 function schemaExists( $schema ) {
757 $eschema = preg_replace("/'/", "''", $schema);
758 $SQL = "SELECT rolname FROM pg_catalog.pg_namespace n, pg_catalog.pg_roles r "
759 ."WHERE n.nspowner=r.oid AND n.nspname = '$eschema'";
760 $res = $this->query( $SQL );
761 $owner = $res ? pg_num_rows($res) ? pg_fetch_result($res, 0, 0) : false : false;
762 if ($res)
763 $this->freeResult($res);
764 return $owner;
765 }
766
767 /**
768 * Query whether a given column exists in the mediawiki schema
769 */
770 function fieldExists( $table, $field ) {
771 global $wgDBmwschema;
772 $etable = preg_replace("/'/", "''", $table);
773 $eschema = preg_replace("/'/", "''", $wgDBmwschema);
774 $ecol = preg_replace("/'/", "''", $field);
775 $SQL = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n, pg_catalog.pg_attribute a "
776 . "WHERE c.relnamespace = n.oid AND c.relname = '$etable' AND n.nspname = '$eschema' "
777 . "AND a.attrelid = c.oid AND a.attname = '$ecol'";
778 $res = $this->query( $SQL );
779 $count = $res ? pg_num_rows($res) : 0;
780 if ($res)
781 $this->freeResult( $res );
782 return $count;
783 }
784
785 function fieldInfo( $table, $field ) {
786 $res = $this->query( "SELECT $field FROM $table LIMIT 1" );
787 $type = pg_field_type( $res, 0 );
788 return $type;
789 }
790
791 function begin( $fname = 'DatabasePostgrs::begin' ) {
792 $this->query( 'BEGIN', $fname );
793 $this->mTrxLevel = 1;
794 }
795 function immediateCommit( $fname = 'DatabasePostgres::immediateCommit' ) {
796 return true;
797 }
798 function commit( $fname = 'DatabasePostgres::commit' ) {
799 $this->query( 'COMMIT', $fname );
800 $this->mTrxLevel = 0;
801 }
802
803 /* Not even sure why this is used in the main codebase... */
804 function limitResultForUpdate($sql, $num) {
805 return $sql;
806 }
807
808 function setup_database() {
809 global $wgVersion, $wgDBmwschema, $wgDBts2schema, $wgDBport, $wgDBuser;
810
811 ## Make sure that we can write to the correct schema
812 ## If not, Postgres will happily and silently go to the next search_path item
813 $SQL = "CREATE TABLE $wgDBmwschema.mw_test_table(a int)";
814 error_reporting( 0 );
815 $res = $this->doQuery($SQL);
816 error_reporting( E_ALL );
817 if (!$res) {
818 print "<b>FAILED</b>. Make sure that the user \"$wgDBuser\" can write to the schema \"wgDBmwschema\"</li>\n";
819 dieout("</ul>");
820 }
821
822 dbsource( "../maintenance/postgres/tables.sql", $this);
823
824 ## Version-specific stuff
825 if ($this->numeric_version == 8.1) {
826 $this->doQuery("CREATE INDEX ts2_page_text ON pagecontent USING gist(textvector)");
827 $this->doQuery("CREATE INDEX ts2_page_title ON page USING gist(titlevector)");
828 }
829 else {
830 $this->doQuery("CREATE INDEX ts2_page_text ON pagecontent USING gin(textvector)");
831 $this->doQuery("CREATE INDEX ts2_page_title ON page USING gin(titlevector)");
832 }
833
834 ## Update version information
835 $mwv = $this->addQuotes($wgVersion);
836 $pgv = $this->addQuotes($this->getServerVersion());
837 $pgu = $this->addQuotes($this->mUser);
838 $mws = $this->addQuotes($wgDBmwschema);
839 $tss = $this->addQuotes($wgDBts2schema);
840 $pgp = $this->addQuotes($wgDBport);
841 $dbn = $this->addQuotes($this->mDBname);
842 $ctype = pg_fetch_result($this->doQuery("SHOW lc_ctype"),0,0);
843
844 $SQL = "UPDATE mediawiki_version SET mw_version=$mwv, pg_version=$pgv, pg_user=$pgu, ".
845 "mw_schema = $mws, ts2_schema = $tss, pg_port=$pgp, pg_dbname=$dbn, ".
846 "ctype = '$ctype' ".
847 "WHERE type = 'Creation'";
848 $this->query($SQL);
849
850 ## Avoid the non-standard "REPLACE INTO" syntax
851 $f = fopen( "../maintenance/interwiki.sql", 'r' );
852 if ($f == false ) {
853 dieout( "<li>Could not find the interwiki.sql file");
854 }
855 ## We simply assume it is already empty as we have just created it
856 $SQL = "INSERT INTO interwiki(iw_prefix,iw_url,iw_local) VALUES ";
857 while ( ! feof( $f ) ) {
858 $line = fgets($f,1024);
859 $matches = array();
860 if (!preg_match('/^\s*(\(.+?),(\d)\)/', $line, $matches)) {
861 continue;
862 }
863 $this->query("$SQL $matches[1],$matches[2])");
864 }
865 print " (table interwiki successfully populated)...\n";
866 }
867
868 function encodeBlob($b) {
869 return array('bytea',pg_escape_bytea($b));
870 }
871 function decodeBlob($b) {
872 return pg_unescape_bytea( $b );
873 }
874
875 function strencode( $s ) { ## Should not be called by us
876 return pg_escape_string( $s );
877 }
878
879 function addQuotes( $s ) {
880 if ( is_null( $s ) ) {
881 return 'NULL';
882 } else if (is_array( $s )) { ## Assume it is bytea data
883 return "E'$s[1]'";
884 }
885 return "'" . pg_escape_string($s) . "'";
886 // Unreachable: return "E'" . pg_escape_string($s) . "'";
887 }
888
889 function quote_ident( $s ) {
890 return '"' . preg_replace( '/"/', '""', $s) . '"';
891 }
892
893 /* For now, does nothing */
894 function selectDB( $db ) {
895 return true;
896 }
897
898 /**
899 * Returns an optional USE INDEX clause to go after the table, and a
900 * string to go at the end of the query
901 *
902 * @private
903 *
904 * @param array $options an associative array of options to be turned into
905 * an SQL query, valid keys are listed in the function.
906 * @return array
907 */
908 function makeSelectOptions( $options ) {
909 $preLimitTail = $postLimitTail = '';
910 $startOpts = '';
911
912 $noKeyOptions = array();
913 foreach ( $options as $key => $option ) {
914 if ( is_numeric( $key ) ) {
915 $noKeyOptions[$option] = true;
916 }
917 }
918
919 if ( isset( $options['GROUP BY'] ) ) $preLimitTail .= " GROUP BY {$options['GROUP BY']}";
920 if ( isset( $options['ORDER BY'] ) ) $preLimitTail .= " ORDER BY {$options['ORDER BY']}";
921
922 //if (isset($options['LIMIT'])) {
923 // $tailOpts .= $this->limitResult('', $options['LIMIT'],
924 // isset($options['OFFSET']) ? $options['OFFSET']
925 // : false);
926 //}
927
928 if ( isset( $noKeyOptions['FOR UPDATE'] ) ) $postLimitTail .= ' FOR UPDATE';
929 if ( isset( $noKeyOptions['LOCK IN SHARE MODE'] ) ) $postLimitTail .= ' LOCK IN SHARE MODE';
930 if ( isset( $noKeyOptions['DISTINCT'] ) && isset( $noKeyOptions['DISTINCTROW'] ) ) $startOpts .= 'DISTINCT';
931
932 if ( isset( $options['USE INDEX'] ) && ! is_array( $options['USE INDEX'] ) ) {
933 $useIndex = $this->useIndexClause( $options['USE INDEX'] );
934 } else {
935 $useIndex = '';
936 }
937
938 return array( $startOpts, $useIndex, $preLimitTail, $postLimitTail );
939 }
940
941 public function setTimeout( $timeout ) {
942 /// @fixme no-op
943 }
944
945 function ping() {
946 wfDebug( "Function ping() not written for DatabasePostgres.php yet");
947 return true;
948 }
949
950
951 } // end DatabasePostgres class
952
953 ?>