X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2FDatabasePostgres.php;h=8f8488b9ae17eaa4feea7f532c3e9723b71de916;hb=3857496fe2bda374229d0652a9cf51817cab165b;hp=ad2ecb8473350e41b8aaa69c07923fb0142a5169;hpb=3738fd4f72afffa33461822cc5bc28a389a97e5e;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/DatabasePostgres.php b/includes/DatabasePostgres.php index ad2ecb8473..8f8488b9ae 100644 --- a/includes/DatabasePostgres.php +++ b/includes/DatabasePostgres.php @@ -1,29 +1,91 @@ query(sprintf($q, + $db->addQuotes($wgDBmwschema), + $db->addQuotes($table), + $db->addQuotes($field))); + $row = $db->fetchObject($res); + if (!$row) + return null; + $n = new PostgresField; + $n->type = $row->typname; + $n->nullable = ($row->attnotnull == 'f'); + $n->name = $field; + $n->tablename = $table; + $n->max_length = $row->attlen; + return $n; + } + + function name() { + return $this->name; + } + + function tableName() { + return $this->tablename; + } + + function type() { + return $this->type; + } + + function nullable() { + return $this->nullable; + } + + function maxLength() { + return $this->max_length; + } +} /** - * Depends on database + * @ingroup Database */ -require_once( 'Database.php' ); - class DatabasePostgres extends Database { var $mInsertId = NULL; var $mLastResult = NULL; + var $numeric_version = NULL; function DatabasePostgres($server = false, $user = false, $password = false, $dbName = false, $failFunction = false, $flags = 0 ) { - global $wgOut, $wgDBprefix, $wgCommandLineMode; + global $wgOut; # Can't get a reference if it hasn't been set yet if ( !isset( $wgOut ) ) { $wgOut = NULL; @@ -31,13 +93,42 @@ class DatabasePostgres extends Database { $this->mOut =& $wgOut; $this->mFailFunction = $failFunction; $this->mFlags = $flags; - $this->open( $server, $user, $password, $dbName); } - static function newFromParams( $server = false, $user = false, $password = false, $dbName = false, - $failFunction = false, $flags = 0) + function cascadingDeletes() { + return true; + } + function cleanupTriggers() { + return true; + } + function strictIPs() { + return true; + } + function realTimestamps() { + return true; + } + function implicitGroupby() { + return false; + } + function implicitOrderby() { + return false; + } + function searchableIPs() { + return true; + } + function functionalIndexes() { + return true; + } + + function hasConstraint( $name ) { + global $wgDBmwschema; + $SQL = "SELECT 1 FROM pg_catalog.pg_constraint c, pg_catalog.pg_namespace n WHERE c.connamespace = n.oid AND conname = '" . pg_escape_string( $name ) . "' AND n.nspname = '" . pg_escape_string($wgDBmwschema) ."'"; + return $this->numRows($res = $this->doQuery($SQL)); + } + + static function newFromParams( $server, $user, $password, $dbName, $failFunction = false, $flags = 0) { return new DatabasePostgres( $server, $user, $password, $dbName, $failFunction, $flags ); } @@ -47,22 +138,24 @@ class DatabasePostgres extends Database { * If the failFunction is set to a non-zero integer, returns success */ function open( $server, $user, $password, $dbName ) { - # Test for PostgreSQL support, to avoid suppressed fatal error + # Test for Postgres support, to avoid suppressed fatal error if ( !function_exists( 'pg_connect' ) ) { - throw new DBConnectionError( $this, "PostgreSQL functions missing, have you compiled PHP with the --with-pgsql option?\n" ); + throw new DBConnectionError( $this, "Postgres functions missing, have you compiled PHP with the --with-pgsql option?\n (Note: if you recently installed PHP, you may need to restart your webserver and database)\n" ); } - global $wgDBport; + if (!strlen($user)) { ## e.g. the class is being loaded + return; + } + $this->close(); $this->mServer = $server; - $port = $wgDBport; + $this->mPort = $port = $wgDBport; $this->mUser = $user; $this->mPassword = $password; $this->mDBname = $dbName; - $success = false; $hstring=""; if ($server!=false && $server!="") { $hstring="host=$server "; @@ -71,10 +164,6 @@ class DatabasePostgres extends Database { $hstring .= "port=$port "; } - if (!strlen($user)) { ## e.g. the class is being loaded - return; - } - error_reporting( E_ALL ); @$this->mConn = pg_connect("$hstring dbname=$dbName user=$user password=$password"); @@ -86,155 +175,370 @@ class DatabasePostgres extends Database { } $this->mOpened = true; - ## If this is the initial connection, setup the schema stuff and possibly create the user - if (defined('MEDIAWIKI_INSTALL')) { - global $wgDBname, $wgDBuser, $wgDBpass, $wgDBsuperuser, $wgDBmwschema, $wgDBts2schema; - print "OK\n"; - ## Are we connecting as a superuser for the first time? - if ($wgDBsuperuser) { - $SQL = "SELECT 1 FROM pg_catalog.pg_user WHERE usename = " . $this->addQuotes($wgDBuser); + global $wgCommandLineMode; + ## If called from the command-line (e.g. importDump), only show errors + if ($wgCommandLineMode) { + $this->doQuery("SET client_min_messages = 'ERROR'"); + } + + global $wgDBmwschema, $wgDBts2schema; + if (isset( $wgDBmwschema ) && isset( $wgDBts2schema ) + && $wgDBmwschema !== 'mediawiki' + && preg_match( '/^\w+$/', $wgDBmwschema ) + && preg_match( '/^\w+$/', $wgDBts2schema ) + ) { + $safeschema = $this->quote_ident($wgDBmwschema); + $safeschema2 = $this->quote_ident($wgDBts2schema); + $this->doQuery("SET search_path = $safeschema, $wgDBts2schema, public"); + } + + return $this->mConn; + } + + + function initial_setup($password, $dbName) { + // If this is the initial connection, setup the schema stuff and possibly create the user + global $wgDBname, $wgDBuser, $wgDBpassword, $wgDBsuperuser, $wgDBmwschema, $wgDBts2schema; + + print "
  • Checking the version of Postgres..."; + $version = $this->getServerVersion(); + $PGMINVER = '8.1'; + if ($this->numeric_version < $PGMINVER) { + print "FAILED. Required version is $PGMINVER. You have $this->numeric_version ($version)
  • \n"; + dieout(""); + } + print "version $this->numeric_version is OK.\n"; + + $safeuser = $this->quote_ident($wgDBuser); + // Are we connecting as a superuser for the first time? + if ($wgDBsuperuser) { + // Are we really a superuser? Check out our rights + $SQL = "SELECT + CASE WHEN usesuper IS TRUE THEN + CASE WHEN usecreatedb IS TRUE THEN 3 ELSE 1 END + ELSE CASE WHEN usecreatedb IS TRUE THEN 2 ELSE 0 END + END AS rights + FROM pg_catalog.pg_user WHERE usename = " . $this->addQuotes($wgDBsuperuser); + $rows = $this->numRows($res = $this->doQuery($SQL)); + if (!$rows) { + print "
  • ERROR: Could not read permissions for user \"$wgDBsuperuser\"
  • \n"; + dieout(''); + } + $perms = pg_fetch_result($res, 0, 0); + + $SQL = "SELECT 1 FROM pg_catalog.pg_user WHERE usename = " . $this->addQuotes($wgDBuser); + $rows = $this->numRows($this->doQuery($SQL)); + if ($rows) { + print "
  • User \"$wgDBuser\" already exists, skipping account creation.
  • "; + } + else { + if ($perms != 1 and $perms != 3) { + print "
  • ERROR: the user \"$wgDBsuperuser\" cannot create other users. "; + print 'Please use a different Postgres user.
  • '; + dieout(''); + } + print "
  • Creating user $wgDBuser..."; + $safepass = $this->addQuotes($wgDBpassword); + $SQL = "CREATE USER $safeuser NOCREATEDB PASSWORD $safepass"; + $this->doQuery($SQL); + print "OK
  • \n"; + } + // User now exists, check out the database + if ($dbName != $wgDBname) { + $SQL = "SELECT 1 FROM pg_catalog.pg_database WHERE datname = " . $this->addQuotes($wgDBname); $rows = $this->numRows($this->doQuery($SQL)); if ($rows) { - print "
  • User \"$wgDBuser\" already exists, skipping account creation.
  • "; + print "
  • Database \"$wgDBname\" already exists, skipping database creation.
  • "; } else { - ## Can we create users? - $SQL = "SELECT 1 FROM pg_catalog.pg_user WHERE usesuper IS TRUE AND ". - "usename = " . $this->addQuotes($wgDBsuperuser); - $rows = $this->numRows($this->doQuery($SQL)); - if (!$rows) { - print "
  • ERROR: the user \"$wgDBsuperuser\" cannot create other users. "; + if ($perms < 2) { + print "
  • ERROR: the user \"$wgDBsuperuser\" cannot create databases. "; print 'Please use a different Postgres user.
  • '; dieout(''); } - print "
  • Creating user $wgDBuser..."; - $safepass = $this->addQuotes($wgDBpass); - $SQL = "CREATE USER \"$wgDBuser\" NOCREATEDB PASSWORD $safepass"; - $this->doQuery($SQL); - print "OK
  • \n"; - } - ## User now exists, check out the database - $safename = $this->addQuotes($wgDBname); - $SQL = "SELECT 1 FROM pg_catalog.pg_database WHERE datname = $safename"; - $rows = $this->numRows($this->doQuery($SQL)); - if ($rows) { - print "
  • Database \"$wgDBname\" already exists, skipping database creation.
  • "; - } - else { print "
  • Creating database $wgDBname..."; - $SQL = "CREATE DATABASE \"$wgDBname\" OWNER \"$wgDBuser\" "; + $safename = $this->quote_ident($wgDBname); + $SQL = "CREATE DATABASE $safename OWNER $safeuser "; $this->doQuery($SQL); print "OK
  • \n"; - ## Hopefully tsearch2 and plpgsql are in template1... + // Hopefully tsearch2 and plpgsql are in template1... } - ## Reconnect to check out tsearch2 rights for this user + // Reconnect to check out tsearch2 rights for this user print "
  • Connecting to \"$wgDBname\" as superuser \"$wgDBsuperuser\" to check rights..."; - @$this->mConn = pg_connect("$hstring dbname=$wgDBname user=$user password=$password"); + + $hstring=""; + if ($this->mServer!=false && $this->mServer!="") { + $hstring="host=$this->mServer "; + } + if ($this->mPort!=false && $this->mPort!="") { + $hstring .= "port=$this->mPort "; + } + + @$this->mConn = pg_connect("$hstring dbname=$wgDBname user=$wgDBsuperuser password=$password"); if ( $this->mConn == false ) { print "FAILED TO CONNECT!
  • "; - dieout(""); + dieout(""); } - print "OK!"; + print "OK\n"; + } + + if ($this->numeric_version < 8.3) { + // Tsearch2 checks print "
  • Checking that tsearch2 is installed in the database \"$wgDBname\"..."; if (! $this->tableExists("pg_ts_cfg", $wgDBts2schema)) { print "FAILED. tsearch2 must be installed in the database \"$wgDBname\"."; - print "Please see 'http://www.devx.com/opensource/Article/21674/0/page/2'>this article"; + print "Please see this article"; print " for instructions or ask on #postgresql on irc.freenode.net
  • \n"; dieout(""); - } + } print "OK\n"; - print "Ensuring that user \"$wgDBuser\" has select rights on the tsearch2 tables..."; + print "
  • Ensuring that user \"$wgDBuser\" has select rights on the tsearch2 tables..."; foreach (array('cfg','cfgmap','dict','parser') as $table) { - $SQL = "GRANT SELECT ON pg_ts_$table TO \"$wgDBuser\""; + $SQL = "GRANT SELECT ON pg_ts_$table TO $safeuser"; $this->doQuery($SQL); } + print "OK
  • \n"; + } - $wgDBsuperuser = ''; - return true; ## Reconnect as regular user + // Setup the schema for this user if needed + $result = $this->schemaExists($wgDBmwschema); + $safeschema = $this->quote_ident($wgDBmwschema); + if (!$result) { + print "
  • Creating schema $wgDBmwschema ..."; + $result = $this->doQuery("CREATE SCHEMA $safeschema AUTHORIZATION $safeuser"); + if (!$result) { + print "FAILED.
  • \n"; + dieout(""); + } + print "OK\n"; + } + else { + print "
  • Schema already exists, explicitly granting rights...\n"; + $safeschema2 = $this->addQuotes($wgDBmwschema); + $SQL = "SELECT 'GRANT ALL ON '||pg_catalog.quote_ident(relname)||' TO $safeuser;'\n". + "FROM pg_catalog.pg_class p, pg_catalog.pg_namespace n\n". + "WHERE relnamespace = n.oid AND n.nspname = $safeschema2\n". + "AND p.relkind IN ('r','S','v')\n"; + $SQL .= "UNION\n"; + $SQL .= "SELECT 'GRANT ALL ON FUNCTION '||pg_catalog.quote_ident(proname)||'('||\n". + "pg_catalog.oidvectortypes(p.proargtypes)||') TO $safeuser;'\n". + "FROM pg_catalog.pg_proc p, pg_catalog.pg_namespace n\n". + "WHERE p.pronamespace = n.oid AND n.nspname = $safeschema2"; + $res = $this->doQuery($SQL); + if (!$res) { + print "FAILED. Could not set rights for the user.
  • \n"; + dieout(""); + } + $this->doQuery("SET search_path = $safeschema"); + $rows = $this->numRows($res); + while ($rows) { + $rows--; + $this->doQuery(pg_fetch_result($res, $rows, 0)); + } + print "OK"; } + // Install plpgsql if needed + $this->setup_plpgsql(); + + $wgDBsuperuser = ''; + return true; // Reconnect as regular user + + } // end superuser + if (!defined('POSTGRES_SEARCHPATH')) { - ## Do we have the basic tsearch2 table? - print "
  • Checking for tsearch2 in the schema \"$wgDBts2schema\"..."; - if (! $this->tableExists("pg_ts_dict", $wgDBts2schema)) { - print "FAILED. Make sure tsearch2 is installed. See this article"; - print " for instructions.
  • \n"; - dieout(""); - } - print "OK\n"; + if ($this->numeric_version < 8.3) { + // Do we have the basic tsearch2 table? + print "
  • Checking for tsearch2 in the schema \"$wgDBts2schema\"..."; + if (! $this->tableExists("pg_ts_dict", $wgDBts2schema)) { + print "FAILED. Make sure tsearch2 is installed. See this article"; + print " for instructions.
  • \n"; + dieout(""); + } + print "OK\n"; - ## Does this user have the rights to the tsearch2 tables? - print "
  • Checking tsearch2 permissions..."; - $SQL = "SELECT 1 FROM $wgDBts2schema.pg_ts_cfg"; - error_reporting( 0 ); - $res = $this->doQuery($SQL); - error_reporting( E_ALL ); - if (!$res) { - print "FAILED. Make sure that the user \"$wgDBuser\" has SELECT access to the tsearch2 tables
  • \n"; - dieout(""); - } - print "OK"; + // Does this user have the rights to the tsearch2 tables? + $ctype = pg_fetch_result($this->doQuery("SHOW lc_ctype"),0,0); + print "
  • Checking tsearch2 permissions..."; + // Let's check all four, just to be safe + error_reporting( 0 ); + $ts2tables = array('cfg','cfgmap','dict','parser'); + $safetsschema = $this->quote_ident($wgDBts2schema); + foreach ( $ts2tables AS $tname ) { + $SQL = "SELECT count(*) FROM $safetsschema.pg_ts_$tname"; + $res = $this->doQuery($SQL); + if (!$res) { + print "FAILED to access pg_ts_$tname. Make sure that the user ". + "\"$wgDBuser\" has SELECT access to all four tsearch2 tables
  • \n"; + dieout(""); + } + } + $SQL = "SELECT ts_name FROM $safetsschema.pg_ts_cfg WHERE locale = '$ctype'"; + $SQL .= " ORDER BY CASE WHEN ts_name <> 'default' THEN 1 ELSE 0 END"; + $res = $this->doQuery($SQL); + error_reporting( E_ALL ); + if (!$res) { + print "FAILED. Could not determine the tsearch2 locale information\n"; + dieout(""); + } + print "OK"; + + // Will the current locale work? Can we force it to? + print "
  • Verifying tsearch2 locale with $ctype..."; + $rows = $this->numRows($res); + $resetlocale = 0; + if (!$rows) { + print "not found
  • \n"; + print "
  • Attempting to set default tsearch2 locale to \"$ctype\"..."; + $resetlocale = 1; + } + else { + $tsname = pg_fetch_result($res, 0, 0); + if ($tsname != 'default') { + print "not set to default ($tsname)"; + print "
  • Attempting to change tsearch2 default locale to \"$ctype\"..."; + $resetlocale = 1; + } + } + if ($resetlocale) { + $SQL = "UPDATE $safetsschema.pg_ts_cfg SET locale = '$ctype' WHERE ts_name = 'default'"; + $res = $this->doQuery($SQL); + if (!$res) { + print "FAILED. "; + print "Please make sure that the locale in pg_ts_cfg for \"default\" is set to \"$ctype\"
  • \n"; + dieout(""); + } + print "OK"; + } - ## Do we have plpgsql installed? - print "
  • Checking for plpgsql ..."; - $SQL = "SELECT 1 FROM pg_catalog.pg_language WHERE lanname = 'plpgsql'"; - $res = $this->doQuery($SQL); - $rows = $this->numRows($this->doQuery($SQL)); - if ($rows < 1) { - print "FAILED. Make sure the language plpgsql is installed for the database $wgDBnamet
  • "; - dieout(""); + // Final test: try out a simple tsearch2 query + $SQL = "SELECT $safetsschema.to_tsvector('default','MediaWiki tsearch2 testing')"; + $res = $this->doQuery($SQL); + if (!$res) { + print "FAILED. Specifically, \"$SQL\" did not work."; + dieout(""); + } + print "OK"; } - print "OK\n"; - ## Does the schema already exist? Who owns it? + // Install plpgsql if needed + $this->setup_plpgsql(); + + // Does the schema already exist? Who owns it? $result = $this->schemaExists($wgDBmwschema); if (!$result) { print "
  • Creating schema $wgDBmwschema ..."; - $result = $this->doQuery("CREATE SCHEMA $wgDBmwschema"); + error_reporting( 0 ); + $safeschema = $this->quote_ident($wgDBmwschema); + $result = $this->doQuery("CREATE SCHEMA $safeschema"); + error_reporting( E_ALL ); if (!$result) { - print "FAILED.
  • \n"; - return false; + print "FAILED. The user \"$wgDBuser\" must be able to access the schema. ". + "You can try making them the owner of the database, or try creating the schema with a ". + "different user, and then grant access to the \"$wgDBuser\" user.\n"; + dieout(""); } - print "ok\n"; + print "OK\n"; } - else if ($result != $user) { - print "
  • Schema \"$wgDBmwschema\" exists but is not owned by \"$user\". Not ideal.
  • \n"; + else if ($result != $wgDBuser) { + print "
  • Schema \"$wgDBmwschema\" exists but is not owned by \"$wgDBuser\". Not ideal.
  • \n"; } else { - print "
  • Schema \"$wgDBmwschema\" exists and is owned by \"$user\". Excellent.
  • \n"; + print "
  • Schema \"$wgDBmwschema\" exists and is owned by \"$wgDBuser\". Excellent.
  • \n"; + } + + // Always return GMT time to accomodate the existing integer-based timestamp assumption + print "
  • Setting the timezone to GMT for user \"$wgDBuser\" ..."; + $SQL = "ALTER USER $safeuser SET timezone = 'GMT'"; + $result = pg_query($this->mConn, $SQL); + if (!$result) { + print "FAILED.
  • \n"; + dieout(""); + } + print "OK\n"; + // Set for the rest of this session + $SQL = "SET timezone = 'GMT'"; + $result = pg_query($this->mConn, $SQL); + if (!$result) { + print "
  • Failed to set timezone
  • \n"; + dieout(""); } - ## Fix up the search paths if needed - print "
  • Setting the search path for user \"$user\" ..."; - $path = "$wgDBmwschema"; + print "
  • Setting the datestyle to ISO, YMD for user \"$wgDBuser\" ..."; + $SQL = "ALTER USER $safeuser SET datestyle = 'ISO, YMD'"; + $result = pg_query($this->mConn, $SQL); + if (!$result) { + print "FAILED.
  • \n"; + dieout(""); + } + print "OK\n"; + // Set for the rest of this session + $SQL = "SET datestyle = 'ISO, YMD'"; + $result = pg_query($this->mConn, $SQL); + if (!$result) { + print "
  • Failed to set datestyle
  • \n"; + dieout(""); + } + + // Fix up the search paths if needed + print "
  • Setting the search path for user \"$wgDBuser\" ..."; + $path = $this->quote_ident($wgDBmwschema); if ($wgDBts2schema !== $wgDBmwschema) - $path .= ", $wgDBts2schema"; + $path .= ", ". $this->quote_ident($wgDBts2schema); if ($wgDBmwschema !== 'public' and $wgDBts2schema !== 'public') $path .= ", public"; - $SQL = "ALTER USER $user SET search_path = $path"; + $SQL = "ALTER USER $safeuser SET search_path = $path"; $result = pg_query($this->mConn, $SQL); if (!$result) { print "FAILED.
  • \n"; - return false; + dieout(""); } - print "ok\n"; - ## Set for the rest of this session + print "OK\n"; + // Set for the rest of this session $SQL = "SET search_path = $path"; $result = pg_query($this->mConn, $SQL); if (!$result) { print "
  • Failed to set search_path
  • \n"; - return false; + dieout(""); } define( "POSTGRES_SEARCHPATH", $path ); - }} + } + } - return $this->mConn; + + function setup_plpgsql() { + print "
  • Checking for Pl/Pgsql ..."; + $SQL = "SELECT 1 FROM pg_catalog.pg_language WHERE lanname = 'plpgsql'"; + $rows = $this->numRows($this->doQuery($SQL)); + if ($rows < 1) { + // plpgsql is not installed, but if we have a pg_pltemplate table, we should be able to create it + print "not installed. Attempting to install Pl/Pgsql ..."; + $SQL = "SELECT 1 FROM pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n ON (n.oid = c.relnamespace) ". + "WHERE relname = 'pg_pltemplate' AND nspname='pg_catalog'"; + $rows = $this->numRows($this->doQuery($SQL)); + if ($rows >= 1) { + $olde = error_reporting(0); + error_reporting($olde - E_WARNING); + $result = $this->doQuery("CREATE LANGUAGE plpgsql"); + error_reporting($olde); + if (!$result) { + print "FAILED. You need to install the language plpgsql in the database $wgDBname
  • "; + dieout(""); + } + } + else { + print "FAILED. You need to install the language plpgsql in the database $wgDBname"; + dieout(""); + } + } + print "OK\n"; } + /** * Closes a database connection, if it is open * Returns success, true if already closed @@ -249,6 +553,9 @@ class DatabasePostgres extends Database { } function doQuery( $sql ) { + if (function_exists('mb_convert_encoding')) { + return $this->mLastResult=pg_query( $this->mConn , mb_convert_encoding($sql,'UTF-8') ); + } return $this->mLastResult=pg_query( $this->mConn , $sql); } @@ -257,18 +564,24 @@ class DatabasePostgres extends Database { } function freeResult( $res ) { + if ( $res instanceof ResultWrapper ) { + $res = $res->result; + } if ( !@pg_free_result( $res ) ) { - throw new DBUnexpectedError($this, "Unable to free PostgreSQL result\n" ); + throw new DBUnexpectedError($this, "Unable to free Postgres result\n" ); } } function fetchObject( $res ) { + if ( $res instanceof ResultWrapper ) { + $res = $res->result; + } @$row = pg_fetch_object( $res ); # FIXME: HACK HACK HACK HACK debug # TODO: # hashar : not sure if the following test really trigger if the object - # fetching failled. + # fetching failed. if( pg_last_error($this->mConn) ) { throw new DBUnexpectedError($this, 'SQL error: ' . htmlspecialchars( pg_last_error($this->mConn) ) ); } @@ -276,6 +589,9 @@ class DatabasePostgres extends Database { } function fetchRow( $res ) { + if ( $res instanceof ResultWrapper ) { + $res = $res->result; + } @$row = pg_fetch_array( $res ); if( pg_last_error($this->mConn) ) { throw new DBUnexpectedError($this, 'SQL error: ' . htmlspecialchars( pg_last_error($this->mConn) ) ); @@ -284,14 +600,27 @@ class DatabasePostgres extends Database { } function numRows( $res ) { + if ( $res instanceof ResultWrapper ) { + $res = $res->result; + } @$n = pg_num_rows( $res ); if( pg_last_error($this->mConn) ) { throw new DBUnexpectedError($this, 'SQL error: ' . htmlspecialchars( pg_last_error($this->mConn) ) ); } return $n; } - function numFields( $res ) { return pg_num_fields( $res ); } - function fieldName( $res, $n ) { return pg_field_name( $res, $n ); } + function numFields( $res ) { + if ( $res instanceof ResultWrapper ) { + $res = $res->result; + } + return pg_num_fields( $res ); + } + function fieldName( $res, $n ) { + if ( $res instanceof ResultWrapper ) { + $res = $res->result; + } + return pg_field_name( $res, $n ); + } /** * This must be called after nextSequenceVal @@ -300,7 +629,13 @@ class DatabasePostgres extends Database { return $this->mInsertId; } - function dataSeek( $res, $row ) { return pg_result_seek( $res, $row ); } + function dataSeek( $res, $row ) { + if ( $res instanceof ResultWrapper ) { + $res = $res->result; + } + return pg_result_seek( $res, $row ); + } + function lastError() { if ( $this->mConn ) { return pg_last_error(); @@ -309,12 +644,41 @@ class DatabasePostgres extends Database { return "No database connection"; } } - function lastErrno() { return 1; } + function lastErrno() { + return pg_last_error() ? 1 : 0; + } function affectedRows() { + if( !isset( $this->mLastResult ) or ! $this->mLastResult ) + return 0; + return pg_affected_rows( $this->mLastResult ); } + /** + * Estimate rows in dataset + * Returns estimated count, based on EXPLAIN output + * This is not necessarily an accurate estimate, so use sparingly + * Returns -1 if count cannot be found + * Takes same arguments as Database::select() + */ + + function estimateRowCount( $table, $vars='*', $conds='', $fname = 'Database::estimateRowCount', $options = array() ) { + $options['EXPLAIN'] = true; + $res = $this->select( $table, $vars, $conds, $fname, $options ); + $rows = -1; + if ( $res ) { + $row = $this->fetchRow( $res ); + $count = array(); + if( preg_match( '/rows=(\d+)/', $row[0], $count ) ) { + $rows = $count[1]; + } + $this->freeResult($res); + } + return $rows; + } + + /** * Returns information about an index * If errors are explicitly ignored, returns NULL on failure @@ -325,7 +689,6 @@ class DatabasePostgres extends Database { if ( !$res ) { return NULL; } - while ( $row = $this->fetchObject( $res ) ) { if ( $row->indexname == $index ) { return $row; @@ -346,46 +709,96 @@ class DatabasePostgres extends Database { } - function insert( $table, $a, $fname = 'Database::insert', $options = array() ) { - # PostgreSQL doesn't support options - # We have a go at faking one of them - # TODO: DELAYED, LOW_PRIORITY + /** + * INSERT wrapper, inserts an array into a table + * + * $args may be a single associative array, or an array of these with numeric keys, + * for multi-row insert (Postgres version 8.2 and above only). + * + * @param array $table String: Name of the table to insert to. + * @param array $args Array: Items to insert into the table. + * @param array $fname String: Name of the function, for profiling + * @param mixed $options String or Array. Valid options: IGNORE + * + * @return bool Success of insert operation. IGNORE always returns true. + */ + function insert( $table, $args, $fname = 'DatabasePostgres::insert', $options = array() ) { + global $wgDBversion; + + if ( !count( $args ) ) { + return true; + } - if ( !is_array($options)) - $options = array($options); + $table = $this->tableName( $table ); + if (! isset( $wgDBversion ) ) { + $this->getServerVersion(); + $wgDBversion = $this->numeric_version; + } - if ( in_array( 'IGNORE', $options ) ) - $oldIgnore = $this->ignoreErrors( true ); + if ( !is_array( $options ) ) + $options = array( $options ); - # IGNORE is performed using single-row inserts, ignoring errors in each - # FIXME: need some way to distiguish between key collision and other types of error - $oldIgnore = $this->ignoreErrors( true ); - if ( !is_array( reset( $a ) ) ) { - $a = array( $a ); + if ( isset( $args[0] ) && is_array( $args[0] ) ) { + $multi = true; + $keys = array_keys( $args[0] ); } - foreach ( $a as $row ) { - parent::insert( $table, $row, $fname, array() ); + else { + $multi = false; + $keys = array_keys( $args ); } - $this->ignoreErrors( $oldIgnore ); - $retVal = true; - if ( in_array( 'IGNORE', $options ) ) - $this->ignoreErrors( $oldIgnore ); + $ignore = in_array( 'IGNORE', $options ) ? 1 : 0; + if ( $ignore ) + $olde = error_reporting( 0 ); + + $sql = "INSERT INTO $table (" . implode( ',', $keys ) . ') VALUES '; + + if ( $multi ) { + if ( $wgDBversion >= 8.2 ) { + $first = true; + foreach ( $args as $row ) { + if ( $first ) { + $first = false; + } else { + $sql .= ','; + } + $sql .= '(' . $this->makeList( $row ) . ')'; + } + $res = (bool)$this->query( $sql, $fname, $ignore ); + } + else { + $res = true; + $origsql = $sql; + foreach ( $args as $row ) { + $tempsql = $origsql; + $tempsql .= '(' . $this->makeList( $row ) . ')'; + $tempres = (bool)$this->query( $tempsql, $fname, $ignore ); + if (! $tempres) + $res = false; + } + } + } + else { + $sql .= '(' . $this->makeList( $args ) . ')'; + $res = (bool)$this->query( $sql, $fname, $ignore ); + } + + if ( $ignore ) { + $olde = error_reporting( $olde ); + return true; + } + + return $res; - return $retVal; } function tableName( $name ) { - # Replace backticks into double quotes - $name = strtr($name,'`','"'); - - # Now quote PG reserved keywords + # Replace reserved words with better ones switch( $name ) { case 'user': - case 'old': - case 'group': - return '"' . $name . '"'; - + return 'mwuser'; + case 'text': + return 'pagecontent'; default: return $name; } @@ -404,15 +817,26 @@ class DatabasePostgres extends Database { } /** - * USE INDEX clause - * PostgreSQL doesn't have them and returns "" + * Return the current value of a sequence. Assumes it has ben nextval'ed in this session. + */ + function currentSequenceValue( $seqName ) { + $safeseq = preg_replace( "/'/", "''", $seqName ); + $res = $this->query( "SELECT currval('$safeseq')" ); + $row = $this->fetchRow( $res ); + $currval = $row[0]; + $this->freeResult( $res ); + return $currval; + } + + /** + * Postgres does not have a "USE INDEX" clause, so return an empty string */ function useIndexClause( $index ) { return ''; } # REPLACE query wrapper - # PostgreSQL simulates this with a DELETE followed by INSERT + # Postgres simulates this with a DELETE followed by INSERT # $row is the row to insert, an associative array # $uniqueIndexes is an array of indexes. Each element may be either a # field name or an array of field names @@ -508,13 +932,13 @@ class DatabasePostgres extends Database { return ''; } - function limitResult($sql, $limit,$offset) { + function limitResult($sql, $limit, $offset=false) { return "$sql LIMIT $limit ".(is_numeric($offset)?" OFFSET {$offset} ":""); } /** * Returns an SQL expression for a simple conditional. - * Uses CASE on PostgreSQL. + * Uses CASE on Postgres * * @param string $cond SQL expression which will result in a boolean value * @param string $trueVal SQL expression to return if true @@ -525,14 +949,12 @@ class DatabasePostgres extends Database { return " (CASE WHEN $cond THEN $trueVal ELSE $falseVal END) "; } - # FIXME: actually detecting deadlocks might be nice function wasDeadlock() { - return false; + return $this->lastErrno() == '40P01'; } - # Return DB-style timestamp used for MySQL schema function timestamp( $ts=0 ) { - return wfTimestamp(TS_DB,$ts); + return wfTimestamp(TS_POSTGRES,$ts); } /** @@ -544,17 +966,27 @@ class DatabasePostgres extends Database { function reportQueryError( $error, $errno, $sql, $fname, $tempIgnore = false ) { - $message = "A database error has occurred\n" . - "Query: $sql\n" . - "Function: $fname\n" . - "Error: $errno $error\n"; - throw new DBUnexpectedError($this, $message); + // Ignore errors during error handling to avoid infinite recursion + $ignore = $this->ignoreErrors( true ); + $this->mErrorCount++; + + if ($ignore || $tempIgnore) { + wfDebug("SQL ERROR (ignored): $error\n"); + $this->ignoreErrors( $ignore ); + } + else { + $message = "A database error has occurred\n" . + "Query: $sql\n" . + "Function: $fname\n" . + "Error: $errno $error\n"; + throw new DBUnexpectedError($this, $message); + } } /** * @return string wikitext of a link to the server software's web site */ - function getSoftwareLink() { + function getSoftwareLink() { return "[http://www.postgresql.org/ PostgreSQL]"; } @@ -562,32 +994,93 @@ class DatabasePostgres extends Database { * @return string Version information from the database */ function getServerVersion() { - $res = $this->query( "SELECT version()" ); - $row = $this->fetchRow( $res ); - $version = $row[0]; - $this->freeResult( $res ); + $version = pg_fetch_result($this->doQuery("SELECT version()"),0,0); + $thisver = array(); + if (!preg_match('/PostgreSQL (\d+\.\d+)(\S+)/', $version, $thisver)) { + die("Could not determine the numeric version from $version!"); + } + $this->numeric_version = $thisver[1]; return $version; } /** - * Query whether a given table exists (in the given schema, or the default mw one if not given) + * Query whether a given relation exists (in the given schema, or the + * default mw one if not given) */ - function tableExists( $table, $schema = false ) { + function relationExists( $table, $types, $schema = false ) { global $wgDBmwschema; + if (!is_array($types)) + $types = array($types); if (! $schema ) $schema = $wgDBmwschema; - $etable = preg_replace("/'/", "''", $table); - $eschema = preg_replace("/'/", "''", $schema); + $etable = $this->addQuotes($table); + $eschema = $this->addQuotes($schema); $SQL = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n " - . "WHERE c.relnamespace = n.oid AND c.relname = '$etable' AND n.nspname = '$eschema'"; + . "WHERE c.relnamespace = n.oid AND c.relname = $etable AND n.nspname = $eschema " + . "AND c.relkind IN ('" . implode("','", $types) . "')"; $res = $this->query( $SQL ); - $count = $res ? pg_num_rows($res) : 0; + $count = $res ? $res->numRows() : 0; if ($res) $this->freeResult( $res ); - return $count; + return $count ? true : false; + } + + /* + * For backward compatibility, this function checks both tables and + * views. + */ + function tableExists ($table, $schema = false) { + return $this->relationExists($table, array('r', 'v'), $schema); + } + + function sequenceExists ($sequence, $schema = false) { + return $this->relationExists($sequence, 'S', $schema); } + function triggerExists($table, $trigger) { + global $wgDBmwschema; + + $q = <<query(sprintf($q, + $this->addQuotes($wgDBmwschema), + $this->addQuotes($table), + $this->addQuotes($trigger))); + if (!$res) + return NULL; + $rows = $res->numRows(); + $this->freeResult($res); + return $rows; + } + + function ruleExists($table, $rule) { + global $wgDBmwschema; + $exists = $this->selectField("pg_rules", "rulename", + array( "rulename" => $rule, + "tablename" => $table, + "schemaname" => $wgDBmwschema)); + return $exists === $rule; + } + + function constraintExists($table, $constraint) { + global $wgDBmwschema; + $SQL = sprintf("SELECT 1 FROM information_schema.table_constraints ". + "WHERE constraint_schema = %s AND table_name = %s AND constraint_name = %s", + $this->addQuotes($wgDBmwschema), + $this->addQuotes($table), + $this->addQuotes($constraint)); + $res = $this->query($SQL); + if (!$res) + return NULL; + $rows = $res->numRows(); + $this->freeResult($res); + return $rows; + } /** * Query whether a given schema exists. Returns the name of the owner @@ -597,7 +1090,12 @@ class DatabasePostgres extends Database { $SQL = "SELECT rolname FROM pg_catalog.pg_namespace n, pg_catalog.pg_roles r " ."WHERE n.nspowner=r.oid AND n.nspname = '$eschema'"; $res = $this->query( $SQL ); - $owner = $res ? pg_num_rows($res) ? pg_fetch_result($res, 0, 0) : false : false; + if ( $res && $res->numRows() ) { + $row = $res->fetchObject(); + $owner = $row->rolname; + } else { + $owner = false; + } if ($res) $this->freeResult($res); return $owner; @@ -606,7 +1104,7 @@ class DatabasePostgres extends Database { /** * Query whether a given column exists in the mediawiki schema */ - function fieldExists( $table, $field ) { + function fieldExists( $table, $field, $fname = 'DatabasePostgres::fieldExists' ) { global $wgDBmwschema; $etable = preg_replace("/'/", "''", $table); $eschema = preg_replace("/'/", "''", $wgDBmwschema); @@ -614,20 +1112,18 @@ class DatabasePostgres extends Database { $SQL = "SELECT 1 FROM pg_catalog.pg_class c, pg_catalog.pg_namespace n, pg_catalog.pg_attribute a " . "WHERE c.relnamespace = n.oid AND c.relname = '$etable' AND n.nspname = '$eschema' " . "AND a.attrelid = c.oid AND a.attname = '$ecol'"; - $res = $this->query( $SQL ); - $count = $res ? pg_num_rows($res) : 0; + $res = $this->query( $SQL, $fname ); + $count = $res ? $res->numRows() : 0; if ($res) $this->freeResult( $res ); return $count; } function fieldInfo( $table, $field ) { - $res = $this->query( "SELECT $field FROM $table LIMIT 1" ); - $type = pg_field_type( $res, 0 ); - return $type; + return PostgresField::fromText($this, $table, $field); } - function begin( $fname = 'DatabasePostgrs::begin' ) { + function begin( $fname = 'DatabasePostgres::begin' ) { $this->query( 'BEGIN', $fname ); $this->mTrxLevel = 1; } @@ -645,9 +1141,26 @@ class DatabasePostgres extends Database { } function setup_database() { - global $wgVersion, $wgDBmwschema, $wgDBts2schema, $wgDBport; + global $wgVersion, $wgDBmwschema, $wgDBts2schema, $wgDBport, $wgDBuser; + + // Make sure that we can write to the correct schema + // If not, Postgres will happily and silently go to the next search_path item + $ctest = "mediawiki_test_table"; + $safeschema = $this->quote_ident($wgDBmwschema); + if ($this->tableExists($ctest, $wgDBmwschema)) { + $this->doQuery("DROP TABLE $safeschema.$ctest"); + } + $SQL = "CREATE TABLE $safeschema.$ctest(a int)"; + $olde = error_reporting( 0 ); + $res = $this->doQuery($SQL); + error_reporting( $olde ); + if (!$res) { + print "FAILED. Make sure that the user \"$wgDBuser\" can write to the schema \"$wgDBmwschema\"\n"; + dieout(""); + } + $this->doQuery("DROP TABLE $safeschema.$ctest"); - dbsource( "../maintenance/postgres/tables.sql", $this); + $res = dbsource( "../maintenance/postgres/tables.sql", $this); ## Update version information $mwv = $this->addQuotes($wgVersion); @@ -657,9 +1170,11 @@ class DatabasePostgres extends Database { $tss = $this->addQuotes($wgDBts2schema); $pgp = $this->addQuotes($wgDBport); $dbn = $this->addQuotes($this->mDBname); + $ctype = pg_fetch_result($this->doQuery("SHOW lc_ctype"),0,0); $SQL = "UPDATE mediawiki_version SET mw_version=$mwv, pg_version=$pgv, pg_user=$pgu, ". - "mw_schema = $mws, ts2_schema = $tss, pg_port=$pgp, pg_dbname=$dbn ". + "mw_schema = $mws, ts2_schema = $tss, pg_port=$pgp, pg_dbname=$dbn, ". + "ctype = '$ctype' ". "WHERE type = 'Creation'"; $this->query($SQL); @@ -672,19 +1187,25 @@ class DatabasePostgres extends Database { $SQL = "INSERT INTO interwiki(iw_prefix,iw_url,iw_local) VALUES "; while ( ! feof( $f ) ) { $line = fgets($f,1024); - if (!preg_match("/^\s*(\(.+?),(\d)\)/", $line, $matches)) { + $matches = array(); + if (!preg_match('/^\s*(\(.+?),(\d)\)/', $line, $matches)) { continue; } - $yesno = $matches[2]; ## ? "'true'" : "'false'"; $this->query("$SQL $matches[1],$matches[2])"); } print " (table interwiki successfully populated)...\n"; + + $this->doQuery("COMMIT"); } - function encodeBlob($b) { - return array('bytea',pg_escape_bytea($b)); + function encodeBlob( $b ) { + return new Blob ( pg_escape_bytea( $b ) ) ; } - function decodeBlob($b) { + + function decodeBlob( $b ) { + if ($b instanceof Blob) { + $b = $b->fetch(); + } return pg_unescape_bytea( $b ); } @@ -695,13 +1216,115 @@ class DatabasePostgres extends Database { function addQuotes( $s ) { if ( is_null( $s ) ) { return 'NULL'; - } else if (is_array( $s )) { ## Assume it is bytea data - return "E'$s[1]'"; + } else if ($s instanceof Blob) { + return "'".$s->fetch($s)."'"; } return "'" . pg_escape_string($s) . "'"; - return "E'" . pg_escape_string($s) . "'"; } -} + function quote_ident( $s ) { + return '"' . preg_replace( '/"/', '""', $s) . '"'; + } + + /* For now, does nothing */ + function selectDB( $db ) { + return true; + } + + /** + * Postgres specific version of replaceVars. + * Calls the parent version in Database.php + * + * @private + * + * @param string $com SQL string, read from a stream (usually tables.sql) + * + * @return string SQL string + */ + protected function replaceVars( $ins ) { + + $ins = parent::replaceVars( $ins ); + + if ($this->numeric_version >= 8.3) { + // Thanks for not providing backwards-compatibility, 8.3 + $ins = preg_replace( "/to_tsvector\s*\(\s*'default'\s*,/", 'to_tsvector(', $ins ); + } + + if ($this->numeric_version <= 8.1) { // Our minimum version + $ins = str_replace( 'USING gin', 'USING gist', $ins ); + } + + return $ins; + } + + /** + * Various select options + * + * @private + * + * @param array $options an associative array of options to be turned into + * an SQL query, valid keys are listed in the function. + * @return array + */ + function makeSelectOptions( $options ) { + $preLimitTail = $postLimitTail = ''; + $startOpts = $useIndex = ''; + + $noKeyOptions = array(); + foreach ( $options as $key => $option ) { + if ( is_numeric( $key ) ) { + $noKeyOptions[$option] = true; + } + } + + if ( isset( $options['GROUP BY'] ) ) $preLimitTail .= " GROUP BY " . $options['GROUP BY']; + if ( isset( $options['HAVING'] ) ) $preLimitTail .= " HAVING {$options['HAVING']}"; + if ( isset( $options['ORDER BY'] ) ) $preLimitTail .= " ORDER BY " . $options['ORDER BY']; + + //if (isset($options['LIMIT'])) { + // $tailOpts .= $this->limitResult('', $options['LIMIT'], + // isset($options['OFFSET']) ? $options['OFFSET'] + // : false); + //} + + if ( isset( $noKeyOptions['FOR UPDATE'] ) ) $postLimitTail .= ' FOR UPDATE'; + if ( isset( $noKeyOptions['LOCK IN SHARE MODE'] ) ) $postLimitTail .= ' LOCK IN SHARE MODE'; + if ( isset( $noKeyOptions['DISTINCT'] ) || isset( $noKeyOptions['DISTINCTROW'] ) ) $startOpts .= 'DISTINCT'; + + return array( $startOpts, $useIndex, $preLimitTail, $postLimitTail ); + } + + public function setTimeout( $timeout ) { + // @todo fixme no-op + } + + function ping() { + wfDebug( "Function ping() not written for DatabasePostgres.php yet"); + return true; + } + + /** + * How lagged is this slave? + * + */ + public function getLag() { + # Not implemented for PostgreSQL + return false; + } + + function setFakeSlaveLag() {} + function setFakeMaster() {} + + function getDBname() { + return $this->mDBname; + } + + function getServer() { + return $this->mServer; + } + + function buildConcat( $stringList ) { + return implode( ' || ', $stringList ); + } -?> +} // end DatabasePostgres class