use Wikimedia;
use BagOStuff;
use HashBagOStuff;
+use LogicException;
use InvalidArgumentException;
+use UnexpectedValueException;
use Exception;
use RuntimeException;
/** @var string Whether lock granularity is on the level of the entire database */
const ATTR_DB_LEVEL_LOCKING = 'db-level-locking';
+ /** @var int New Database instance will not be connected yet when returned */
+ const NEW_UNCONNECTED = 0;
+ /** @var int New Database instance will already be connected when returned */
+ const NEW_CONNECTED = 1;
+
/** @var string SQL query */
protected $lastQuery = '';
/** @var float|bool UNIX timestamp of last write query */
protected $lastWriteTime = false;
/** @var string|bool */
protected $phpError = false;
- /** @var string */
+ /** @var string Server that this instance is currently connected to */
protected $server;
- /** @var string */
+ /** @var string User that this instance is currently connected under the name of */
protected $user;
- /** @var string */
+ /** @var string Password used to establish the current connection */
protected $password;
- /** @var string */
+ /** @var string Database that this instance is currently connected to */
protected $dbName;
- /** @var array[] $aliases Map of (table => (dbname, schema, prefix) map) */
+ /** @var array[] Map of (table => (dbname, schema, prefix) map) */
protected $tableAliases = [];
+ /** @var string[] Map of (index alias => index) */
+ protected $indexAliases = [];
/** @var bool Whether this PHP instance is for a CLI script */
protected $cliMode;
/** @var string Agent name for query profiling */
protected $agent;
-
+ /** @var array Parameters used by initConnection() to establish a connection */
+ protected $connectionParams = [];
/** @var BagOStuff APC cache */
protected $srvCache;
/** @var LoggerInterface */
* @see Database::trxLevel
*/
private $trxAutomatic = false;
+ /**
+ * Counter for atomic savepoint identifiers. Reset when a new transaction begins.
+ *
+ * @var int
+ */
+ private $trxAtomicCounter = 0;
/**
* Array of levels of atomicity within transactions
*
protected $nonNativeInsertSelectBatchSize = 10000;
/**
- * Constructor and database handle and attempt to connect to the DB server
- *
- * IDatabase classes should not be constructed directly in external
- * code. Database::factory() should be used instead.
- *
+ * @note: exceptions for missing libraries/drivers should be thrown in initConnection()
* @param array $params Parameters passed from Database::factory()
*/
- function __construct( array $params ) {
- $server = $params['host'];
- $user = $params['user'];
- $password = $params['password'];
- $dbName = $params['dbname'];
+ protected function __construct( array $params ) {
+ foreach ( [ 'host', 'user', 'password', 'dbname' ] as $name ) {
+ $this->connectionParams[$name] = $params[$name];
+ }
$this->schema = $params['schema'];
$this->tablePrefix = $params['tablePrefix'];
// Set initial dummy domain until open() sets the final DB/prefix
$this->currentDomain = DatabaseDomain::newUnspecified();
+ }
- if ( $user ) {
- $this->open( $server, $user, $password, $dbName );
- } elseif ( $this->requiresDatabaseUser() ) {
- throw new InvalidArgumentException( "No database user provided." );
+ /**
+ * Initialize the connection to the database over the wire (or to local files)
+ *
+ * @throws LogicException
+ * @throws InvalidArgumentException
+ * @throws DBConnectionError
+ * @since 1.31
+ */
+ final public function initConnection() {
+ if ( $this->isOpen() ) {
+ throw new LogicException( __METHOD__ . ': already connected.' );
}
-
+ // Establish the connection
+ $this->doInitConnection();
// Set the domain object after open() sets the relevant fields
if ( $this->dbName != '' ) {
// Domains with server scope but a table prefix are not used by IDatabase classes
}
}
+ /**
+ * Actually connect to the database over the wire (or to local files)
+ *
+ * @throws InvalidArgumentException
+ * @throws DBConnectionError
+ * @since 1.31
+ */
+ protected function doInitConnection() {
+ if ( strlen( $this->connectionParams['user'] ) ) {
+ $this->open(
+ $this->connectionParams['host'],
+ $this->connectionParams['user'],
+ $this->connectionParams['password'],
+ $this->connectionParams['dbname']
+ );
+ } else {
+ throw new InvalidArgumentException( "No database user provided." );
+ }
+ }
+
/**
* Construct a Database subclass instance given a database type and parameters
*
* - agent: Optional name used to identify the end-user in query profiling/logging.
* - srvCache: Optional BagOStuff instance to an APC-style cache.
* - nonNativeInsertSelectBatchSize: Optional batch size for non-native INSERT SELECT emulation.
+ * @param int $connect One of the class constants (NEW_CONNECTED, NEW_UNCONNECTED) [optional]
* @return Database|null If the database driver or extension cannot be found
* @throws InvalidArgumentException If the database driver or extension cannot be found
* @since 1.18
*/
- final public static function factory( $dbType, $p = [] ) {
+ final public static function factory( $dbType, $p = [], $connect = self::NEW_CONNECTED ) {
$class = self::getClass( $dbType, isset( $p['driver'] ) ? $p['driver'] : null );
if ( class_exists( $class ) && is_subclass_of( $class, IDatabase::class ) ) {
};
}
+ /** @var Database $conn */
$conn = new $class( $p );
+ if ( $connect == self::NEW_CONNECTED ) {
+ $conn->initConnection();
+ }
} else {
$conn = null;
}
return $res;
}
- /**
- * Turns on (false) or off (true) the automatic generation and sending
- * of a "we're sorry, but there has been a database error" page on
- * database errors. Default is on (false). When turned off, the
- * code should use lastErrno() and lastError() to handle the
- * situation as appropriate.
- *
- * Do not use this function outside of the Database classes.
- *
- * @param null|bool $ignoreErrors
- * @return bool The previous value of the flag.
- */
- protected function ignoreErrors( $ignoreErrors = null ) {
- $res = $this->getFlag( self::DBO_IGNORE );
- if ( $ignoreErrors !== null ) {
- // setFlag()/clearFlag() do not allow DBO_IGNORE changes for sanity
- if ( $ignoreErrors ) {
- $this->flags |= self::DBO_IGNORE;
- } else {
- $this->flags &= ~self::DBO_IGNORE;
- }
- }
-
- return $res;
- }
-
public function trxLevel() {
return $this->trxLevel;
}
public function writesOrCallbacksPending() {
return $this->trxLevel && (
- $this->trxDoneWrites || $this->trxIdleCallbacks || $this->trxPreCommitCallbacks
+ $this->trxDoneWrites ||
+ $this->trxIdleCallbacks ||
+ $this->trxPreCommitCallbacks ||
+ $this->trxEndCallbacks
);
}
public function setFlag( $flag, $remember = self::REMEMBER_NOTHING ) {
if ( ( $flag & self::DBO_IGNORE ) ) {
- throw new \UnexpectedValueException( "Modifying DBO_IGNORE is not allowed." );
+ throw new UnexpectedValueException( "Modifying DBO_IGNORE is not allowed." );
}
if ( $remember === self::REMEMBER_PRIOR ) {
public function clearFlag( $flag, $remember = self::REMEMBER_NOTHING ) {
if ( ( $flag & self::DBO_IGNORE ) ) {
- throw new \UnexpectedValueException( "Modifying DBO_IGNORE is not allowed." );
+ throw new UnexpectedValueException( "Modifying DBO_IGNORE is not allowed." );
}
if ( $remember === self::REMEMBER_PRIOR ) {
public function close() {
if ( $this->conn ) {
+ // Resolve any dangling transaction first
if ( $this->trxLevel() ) {
+ // Meaningful transactions should ideally have been resolved by now
+ if ( $this->writesOrCallbacksPending() ) {
+ $this->queryLogger->warning(
+ __METHOD__ . ": writes or callbacks still pending.",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
+ }
+ // Check if it is possible to properly commit and trigger callbacks
+ if ( $this->trxEndCallbacksSuppressed ) {
+ throw new DBUnexpectedError(
+ $this,
+ __METHOD__ . ': callbacks are suppressed; cannot properly commit.'
+ );
+ }
+ // Commit the changes and run any callbacks as needed
$this->commit( __METHOD__, self::FLUSHING_INTERNAL );
}
-
+ // Close the actual connection in the binding handle
$closed = $this->closeConnection();
$this->conn = false;
- } elseif (
- $this->trxIdleCallbacks ||
- $this->trxPreCommitCallbacks ||
- $this->trxEndCallbacks
- ) { // sanity
- throw new RuntimeException( "Transaction callbacks still pending." );
+ // Sanity check that no callbacks are dangling
+ if (
+ $this->trxIdleCallbacks || $this->trxPreCommitCallbacks || $this->trxEndCallbacks
+ ) {
+ throw new RuntimeException( "Transaction callbacks still pending." );
+ }
} else {
- $closed = true;
+ $closed = true; // already closed; nothing to do
}
+
$this->opened = false;
return $closed;
*/
abstract protected function closeConnection();
+ /**
+ * @param string $error Fallback error message, used if none is given by DB
+ * @throws DBConnectionError
+ */
public function reportConnectionError( $error = 'Unknown error' ) {
$myError = $this->lastError();
if ( $myError ) {
}
/**
- * The DBMS-dependent part of query()
+ * Run a query and return a DBMS-dependent wrapper (that has all IResultWrapper methods)
+ *
+ * This might return things, such as mysqli_result, that do not formally implement
+ * IResultWrapper, but nonetheless implement all of its methods correctly
*
* @param string $sql SQL query.
- * @return ResultWrapper|bool Result object to feed to fetchObject,
- * fetchRow, ...; or false on failure
+ * @return IResultWrapper|bool Iterator to feed to fetchObject/fetchRow; false on failure
*/
abstract protected function doQuery( $sql );
$ret = $this->doProfiledQuery( $sql, $commentedSql, $isNonTempWrite, $fname );
# Try reconnecting if the connection was lost
- if ( false === $ret && $this->wasErrorReissuable() ) {
+ if ( false === $ret && $this->wasConnectionLoss() ) {
$recoverable = $this->canRecoverFromDisconnect( $sql, $priorWritesPending );
# Stash the last error values before anything might clear them
$lastError = $this->lastError();
$this->queryLogger->warning( $msg, $params +
[ 'trace' => ( new RuntimeException() )->getTraceAsString() ] );
- if ( !$recoverable ) {
- # Callers may catch the exception and continue to use the DB
- $this->reportQueryError( $lastError, $lastErrno, $sql, $fname );
- } else {
+ if ( $recoverable ) {
# Should be safe to silently retry the query
$ret = $this->doProfiledQuery( $sql, $commentedSql, $isNonTempWrite, $fname );
+ } else {
+ # Callers may catch the exception and continue to use the DB
+ $this->reportQueryError( $lastError, $lastErrno, $sql, $fname );
}
} else {
$msg = __METHOD__ . ': lost connection to {dbserver} permanently';
*/
private function handleSessionLoss() {
$this->trxLevel = 0;
- $this->trxIdleCallbacks = []; // T67263
- $this->trxPreCommitCallbacks = []; // T67263
+ $this->trxAtomicCounter = 0;
+ $this->trxIdleCallbacks = []; // T67263; transaction already lost
+ $this->trxPreCommitCallbacks = []; // T67263; transaction already lost
$this->sessionTempTables = [];
$this->namedLocksHeld = [];
+
+ // Note: if callback suppression is set then some *Callbacks arrays are not cleared here
+ $e = null;
try {
// Handle callbacks in trxEndCallbacks
$this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ } catch ( Exception $ex ) {
+ // Already logged; move on...
+ $e = $e ?: $ex;
+ }
+ try {
+ // Handle callbacks in trxRecurringCallbacks
$this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
- return null;
- } catch ( Exception $e ) {
+ } catch ( Exception $ex ) {
// Already logged; move on...
- return $e;
+ $e = $e ?: $ex;
}
+
+ return $e;
}
/**
return false;
}
+ /**
+ * Report a query error. Log the error, and if neither the object ignore
+ * flag nor the $tempIgnore flag is set, throw a DBQueryError.
+ *
+ * @param string $error
+ * @param int $errno
+ * @param string $sql
+ * @param string $fname
+ * @param bool $tempIgnore
+ * @throws DBQueryError
+ */
public function reportQueryError( $error, $errno, $sql, $fname, $tempIgnore = false ) {
- if ( $this->ignoreErrors() || $tempIgnore ) {
+ if ( $this->getFlag( self::DBO_IGNORE ) || $tempIgnore ) {
$this->queryLogger->debug( "SQL ERROR (ignored): $error\n" );
} else {
$sql1line = mb_substr( str_replace( "\n", "\\n", $sql ), 0, 5 * 1024 );
}
public function estimateRowCount(
- $table, $vars = '*', $conds = '', $fname = __METHOD__, $options = []
+ $table, $var = '*', $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
) {
- $rows = 0;
- $res = $this->select( $table, [ 'rowcount' => 'COUNT(*)' ], $conds, $fname, $options );
-
- if ( $res ) {
- $row = $this->fetchRow( $res );
- $rows = ( isset( $row['rowcount'] ) ) ? (int)$row['rowcount'] : 0;
+ $conds = $this->normalizeConditions( $conds, $fname );
+ $column = $this->extractSingleFieldFromList( $var );
+ if ( is_string( $column ) && !in_array( $column, [ '*', '1' ] ) ) {
+ $conds[] = "$column IS NOT NULL";
}
- return $rows;
+ $res = $this->select(
+ $table, [ 'rowcount' => 'COUNT(*)' ], $conds, $fname, $options, $join_conds
+ );
+ $row = $res ? $this->fetchRow( $res ) : [];
+
+ return isset( $row['rowcount'] ) ? (int)$row['rowcount'] : 0;
}
public function selectRowCount(
- $tables, $vars = '*', $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
+ $tables, $var = '*', $conds = '', $fname = __METHOD__, $options = [], $join_conds = []
) {
- $rows = 0;
- $sql = $this->selectSQLText( $tables, '1', $conds, $fname, $options, $join_conds );
- // The identifier quotes is primarily for MSSQL.
- $rowCountCol = $this->addIdentifierQuotes( "rowcount" );
- $tableName = $this->addIdentifierQuotes( "tmp_count" );
- $res = $this->query( "SELECT COUNT(*) AS $rowCountCol FROM ($sql) $tableName", $fname );
+ $conds = $this->normalizeConditions( $conds, $fname );
+ $column = $this->extractSingleFieldFromList( $var );
+ if ( is_string( $column ) && !in_array( $column, [ '*', '1' ] ) ) {
+ $conds[] = "$column IS NOT NULL";
+ }
- if ( $res ) {
- $row = $this->fetchRow( $res );
- $rows = ( isset( $row['rowcount'] ) ) ? (int)$row['rowcount'] : 0;
+ $res = $this->select(
+ [
+ 'tmp_count' => $this->buildSelectSubquery(
+ $tables,
+ '1',
+ $conds,
+ $fname,
+ $options,
+ $join_conds
+ )
+ ],
+ [ 'rowcount' => 'COUNT(*)' ],
+ [],
+ $fname
+ );
+ $row = $res ? $this->fetchRow( $res ) : [];
+
+ return isset( $row['rowcount'] ) ? (int)$row['rowcount'] : 0;
+ }
+
+ /**
+ * @param array|string $conds
+ * @param string $fname
+ * @return array
+ */
+ final protected function normalizeConditions( $conds, $fname ) {
+ if ( $conds === null || $conds === false ) {
+ $this->queryLogger->warning(
+ __METHOD__
+ . ' called from '
+ . $fname
+ . ' with incorrect parameters: $conds must be a string or an array'
+ );
+ $conds = '';
+ }
+
+ if ( !is_array( $conds ) ) {
+ $conds = ( $conds === '' ) ? [] : [ $conds ];
+ }
+
+ return $conds;
+ }
+
+ /**
+ * @param array|string $var Field parameter in the style of select()
+ * @return string|null Column name or null; ignores aliases
+ * @throws DBUnexpectedError Errors out if multiple columns are given
+ */
+ final protected function extractSingleFieldFromList( $var ) {
+ if ( is_array( $var ) ) {
+ if ( !$var ) {
+ $column = null;
+ } elseif ( count( $var ) == 1 ) {
+ $column = isset( $var[0] ) ? $var[0] : reset( $var );
+ } else {
+ throw new DBUnexpectedError( $this, __METHOD__ . ': got multiple columns.' );
+ }
+ } else {
+ $column = $var;
}
- return $rows;
+ return $column;
}
/**
return 'CAST( ' . $field . ' AS INTEGER )';
}
+ public function buildSelectSubquery(
+ $table, $vars, $conds = '', $fname = __METHOD__,
+ $options = [], $join_conds = []
+ ) {
+ return new Subquery(
+ $this->selectSQLText( $table, $vars, $conds, $fname, $options, $join_conds )
+ );
+ }
+
public function databasesAreIndependent() {
return false;
}
}
public function tableName( $name, $format = 'quoted' ) {
+ if ( $name instanceof Subquery ) {
+ throw new DBUnexpectedError(
+ $this,
+ __METHOD__ . ': got Subquery instance when expecting a string.'
+ );
+ }
+
# Skip the entire process when we have a string quoted on both ends.
# Note that we check the end so that we will still quote any use of
# use of `database`.table. But won't break things if someone wants
# any remote case where a word like on may be inside of a table name
# surrounded by symbols which may be considered word breaks.
if ( preg_match( '/(^|\s)(DISTINCT|JOIN|ON|AS)(\s|$)/i', $name ) !== 0 ) {
+ $this->queryLogger->warning(
+ __METHOD__ . ": use of subqueries is not supported this way.",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
+
return $name;
}
/**
* Get an aliased table name
- * e.g. tableName AS newTableName
*
- * @param string $name Table name, see tableName()
- * @param string|bool $alias Alias (optional)
+ * This returns strings like "tableName AS newTableName" for aliased tables
+ * and "(SELECT * from tableA) newTablename" for subqueries (e.g. derived tables)
+ *
+ * @see Database::tableName()
+ * @param string|Subquery $table Table name or object with a 'sql' field
+ * @param string|bool $alias Table alias (optional)
* @return string SQL name for aliased table. Will not alias a table to its own name
*/
- protected function tableNameWithAlias( $name, $alias = false ) {
- if ( !$alias || $alias == $name ) {
- return $this->tableName( $name );
+ protected function tableNameWithAlias( $table, $alias = false ) {
+ if ( is_string( $table ) ) {
+ $quotedTable = $this->tableName( $table );
+ } elseif ( $table instanceof Subquery ) {
+ $quotedTable = (string)$table;
} else {
- return $this->tableName( $name ) . ' ' . $this->addIdentifierQuotes( $alias );
+ throw new InvalidArgumentException( "Table must be a string or Subquery." );
+ }
+
+ if ( !strlen( $alias ) || $alias === $table ) {
+ if ( $table instanceof Subquery ) {
+ throw new InvalidArgumentException( "Subquery table missing alias." );
+ }
+
+ return $quotedTable;
+ } else {
+ return $quotedTable . ' ' . $this->addIdentifierQuotes( $alias );
}
}
if ( is_array( $table ) ) {
// A parenthesized group
if ( count( $table ) > 1 ) {
- $joinedTable = '('
- . $this->tableNamesWithIndexClauseOrJOIN( $table, $use_index, $ignore_index, $join_conds )
- . ')';
+ $joinedTable = '(' .
+ $this->tableNamesWithIndexClauseOrJOIN(
+ $table, $use_index, $ignore_index, $join_conds ) . ')';
} else {
// Degenerate case
$innerTable = reset( $table );
* @return string
*/
protected function indexName( $index ) {
- return $index;
+ return isset( $this->indexAliases[$index] )
+ ? $this->indexAliases[$index]
+ : $index;
}
public function addQuotes( $s ) {
}
}
- return ' LIKE ' . $this->addQuotes( $s ) . ' ESCAPE ' . $this->addQuotes( $escapeChar ) . ' ';
+ return ' LIKE ' .
+ $this->addQuotes( $s ) . ' ESCAPE ' . $this->addQuotes( $escapeChar ) . ' ';
}
public function anyChar() {
}
try {
- $this->startAtomic( $fname );
+ $this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
$affectedRowCount = 0;
foreach ( $rows as $row ) {
// Delete rows which collide with this one
$this->endAtomic( $fname );
$this->affectedRowCount = $affectedRowCount;
} catch ( Exception $e ) {
- $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ $this->cancelAtomic( $fname );
throw $e;
}
}
$affectedRowCount = 0;
try {
- $this->startAtomic( $fname );
+ $this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
# Update any existing conflicting row(s)
if ( $where !== false ) {
$ok = $this->update( $table, $set, $where, $fname );
$this->endAtomic( $fname );
$this->affectedRowCount = $affectedRowCount;
} catch ( Exception $e ) {
- $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ $this->cancelAtomic( $fname );
throw $e;
}
try {
$affectedRowCount = 0;
- $this->startAtomic( $fname );
+ $this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
$rows = [];
$ok = true;
foreach ( $res as $row ) {
$this->endAtomic( $fname );
$this->affectedRowCount = $affectedRowCount;
} else {
- $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ $this->cancelAtomic( $fname );
}
return $ok;
} catch ( Exception $e ) {
- $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ $this->cancelAtomic( $fname );
throw $e;
}
}
return false;
}
- public function wasErrorReissuable() {
- return false;
+ public function wasConnectionLoss() {
+ return $this->wasConnectionError( $this->lastErrno() );
}
public function wasReadOnlyError() {
return false;
}
+ public function wasErrorReissuable() {
+ return (
+ $this->wasDeadlock() ||
+ $this->wasLockTimeout() ||
+ $this->wasConnectionLoss()
+ );
+ }
+
/**
* Do not use this method outside of Database/DBError classes
*
$this->trxPreCommitCallbacks[] = [ $callback, $fname ];
} else {
// No transaction is active nor will start implicitly, so make one for this callback
- $this->startAtomic( __METHOD__ );
+ $this->startAtomic( __METHOD__, self::ATOMIC_CANCELABLE );
try {
call_user_func( $callback );
$this->endAtomic( __METHOD__ );
} catch ( Exception $e ) {
- $this->rollback( __METHOD__, self::FLUSHING_INTERNAL );
+ $this->cancelAtomic( __METHOD__ );
throw $e;
}
}
}
}
- final public function startAtomic( $fname = __METHOD__ ) {
+ /**
+ * Create a savepoint
+ *
+ * This is used internally to implement atomic sections. It should not be
+ * used otherwise.
+ *
+ * @since 1.31
+ * @param string $identifier Identifier for the savepoint
+ * @param string $fname Calling function name
+ */
+ protected function doSavepoint( $identifier, $fname ) {
+ $this->query( 'SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
+ }
+
+ /**
+ * Release a savepoint
+ *
+ * This is used internally to implement atomic sections. It should not be
+ * used otherwise.
+ *
+ * @since 1.31
+ * @param string $identifier Identifier for the savepoint
+ * @param string $fname Calling function name
+ */
+ protected function doReleaseSavepoint( $identifier, $fname ) {
+ $this->query( 'RELEASE SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
+ }
+
+ /**
+ * Rollback to a savepoint
+ *
+ * This is used internally to implement atomic sections. It should not be
+ * used otherwise.
+ *
+ * @since 1.31
+ * @param string $identifier Identifier for the savepoint
+ * @param string $fname Calling function name
+ */
+ protected function doRollbackToSavepoint( $identifier, $fname ) {
+ $this->query( 'ROLLBACK TO SAVEPOINT ' . $this->addIdentifierQuotes( $identifier ), $fname );
+ }
+
+ final public function startAtomic(
+ $fname = __METHOD__, $cancelable = self::ATOMIC_NOT_CANCELABLE
+ ) {
+ $savepointId = $cancelable === self::ATOMIC_CANCELABLE ? 'n/a' : null;
if ( !$this->trxLevel ) {
$this->begin( $fname, self::TRANSACTION_INTERNAL );
// If DBO_TRX is set, a series of startAtomic/endAtomic pairs will result
if ( !$this->getFlag( self::DBO_TRX ) ) {
$this->trxAutomaticAtomic = true;
}
+ } elseif ( $cancelable === self::ATOMIC_CANCELABLE ) {
+ $savepointId = 'wikimedia_rdbms_atomic' . ++$this->trxAtomicCounter;
+ if ( strlen( $savepointId ) > 30 ) { // 30 == Oracle's identifier length limit (pre 12c)
+ $this->queryLogger->warning(
+ 'There have been an excessively large number of atomic sections in a transaction'
+ . " started by $this->trxFname, reusing IDs (at $fname)",
+ [ 'trace' => ( new RuntimeException() )->getTraceAsString() ]
+ );
+ $this->trxAtomicCounter = 0;
+ $savepointId = 'wikimedia_rdbms_atomic' . ++$this->trxAtomicCounter;
+ }
+ $this->doSavepoint( $savepointId, $fname );
}
- $this->trxAtomicLevels[] = $fname;
+ $this->trxAtomicLevels[] = [ $fname, $savepointId ];
}
final public function endAtomic( $fname = __METHOD__ ) {
if ( !$this->trxLevel ) {
throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
}
- if ( !$this->trxAtomicLevels ||
- array_pop( $this->trxAtomicLevels ) !== $fname
- ) {
+
+ list( $savedFname, $savepointId ) = $this->trxAtomicLevels
+ ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
+ if ( $savedFname !== $fname ) {
throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
}
if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) {
$this->commit( $fname, self::FLUSHING_INTERNAL );
+ } elseif ( $savepointId && $savepointId !== 'n/a' ) {
+ $this->doReleaseSavepoint( $savepointId, $fname );
}
}
+ final public function cancelAtomic( $fname = __METHOD__ ) {
+ if ( !$this->trxLevel ) {
+ throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
+ }
+
+ list( $savedFname, $savepointId ) = $this->trxAtomicLevels
+ ? array_pop( $this->trxAtomicLevels ) : [ null, null ];
+ if ( $savedFname !== $fname ) {
+ throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
+ }
+ if ( !$savepointId ) {
+ throw new DBUnexpectedError( $this, "Uncancelable atomic section canceled (got $fname)." );
+ }
+
+ if ( !$this->trxAtomicLevels && $this->trxAutomaticAtomic ) {
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ } elseif ( $savepointId !== 'n/a' ) {
+ $this->doRollbackToSavepoint( $savepointId, $fname );
+ }
+
+ $this->affectedRowCount = 0; // for the sake of consistency
+ }
+
final public function doAtomicSection( $fname, callable $callback ) {
- $this->startAtomic( $fname );
+ $this->startAtomic( $fname, self::ATOMIC_CANCELABLE );
try {
$res = call_user_func_array( $callback, [ $this, $fname ] );
} catch ( Exception $e ) {
- $this->rollback( $fname, self::FLUSHING_INTERNAL );
+ $this->cancelAtomic( $fname );
throw $e;
}
$this->endAtomic( $fname );
// Protect against mismatched atomic section, transaction nesting, and snapshot loss
if ( $this->trxLevel ) {
if ( $this->trxAtomicLevels ) {
- $levels = implode( ', ', $this->trxAtomicLevels );
+ $levels = array_reduce( $this->trxAtomicLevels, function ( $accum, $v ) {
+ return $accum === null ? $v[0] : "$accum, " . $v[0];
+ } );
$msg = "$fname: Got explicit BEGIN while atomic section(s) $levels are open.";
throw new DBUnexpectedError( $this, $msg );
} elseif ( !$this->trxAutomatic ) {
$msg = "$fname: Explicit transaction already active (from {$this->trxFname}).";
throw new DBUnexpectedError( $this, $msg );
} else {
- // @TODO: make this an exception at some point
$msg = "$fname: Implicit transaction already active (from {$this->trxFname}).";
- $this->queryLogger->error( $msg );
- return; // join the main transaction set
+ throw new DBUnexpectedError( $this, $msg );
}
} elseif ( $this->getFlag( self::DBO_TRX ) && $mode !== self::TRANSACTION_INTERNAL ) {
- // @TODO: make this an exception at some point
$msg = "$fname: Implicit transaction expected (DBO_TRX set).";
- $this->queryLogger->error( $msg );
- return; // let any writes be in the main transaction
+ throw new DBUnexpectedError( $this, $msg );
}
// Avoid fatals if close() was called
$this->assertOpen();
$this->doBegin( $fname );
+ $this->trxAtomicCounter = 0;
$this->trxTimestamp = microtime( true );
$this->trxFname = $fname;
$this->trxDoneWrites = false;
final public function commit( $fname = __METHOD__, $flush = '' ) {
if ( $this->trxLevel && $this->trxAtomicLevels ) {
// There are still atomic sections open. This cannot be ignored
- $levels = implode( ', ', $this->trxAtomicLevels );
+ $levels = array_reduce( $this->trxAtomicLevels, function ( $accum, $v ) {
+ return $accum === null ? $v[0] : "$accum, " . $v[0];
+ } );
throw new DBUnexpectedError(
$this,
"$fname: Got COMMIT while atomic sections $levels are still open."
"$fname: No transaction to commit, something got out of sync." );
return; // nothing to do
} elseif ( $this->trxAutomatic ) {
- // @TODO: make this an exception at some point
- $msg = "$fname: Explicit commit of implicit transaction.";
- $this->queryLogger->error( $msg );
- return; // wait for the main transaction set commit round
+ throw new DBUnexpectedError(
+ $this,
+ "$fname: Expected mass commit of all peer transactions (DBO_TRX set)."
+ );
}
}
}
final public function rollback( $fname = __METHOD__, $flush = '' ) {
- if ( $flush === self::FLUSHING_INTERNAL || $flush === self::FLUSHING_ALL_PEERS ) {
- if ( !$this->trxLevel ) {
- return; // nothing to do
- }
- } else {
- if ( !$this->trxLevel ) {
- $this->queryLogger->error(
- "$fname: No transaction to rollback, something got out of sync." );
- return; // nothing to do
- } elseif ( $this->getFlag( self::DBO_TRX ) ) {
+ $trxActive = $this->trxLevel;
+
+ if ( $flush !== self::FLUSHING_INTERNAL && $flush !== self::FLUSHING_ALL_PEERS ) {
+ if ( $this->getFlag( self::DBO_TRX ) ) {
throw new DBUnexpectedError(
$this,
- "$fname: Expected mass rollback of all peer databases (DBO_TRX set)."
+ "$fname: Expected mass rollback of all peer transactions (DBO_TRX set)."
);
}
}
- // Avoid fatals if close() was called
- $this->assertOpen();
+ if ( $trxActive ) {
+ // Avoid fatals if close() was called
+ $this->assertOpen();
- $this->doRollback( $fname );
- $this->trxAtomicLevels = [];
- if ( $this->trxDoneWrites ) {
- $this->trxProfiler->transactionWritingOut(
- $this->server,
- $this->dbName,
- $this->trxShortId
- );
+ $this->doRollback( $fname );
+ $this->trxAtomicLevels = [];
+ if ( $this->trxDoneWrites ) {
+ $this->trxProfiler->transactionWritingOut(
+ $this->server,
+ $this->dbName,
+ $this->trxShortId
+ );
+ }
}
- $this->trxIdleCallbacks = []; // clear
- $this->trxPreCommitCallbacks = []; // clear
- try {
- $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
- } catch ( Exception $e ) {
- // already logged; finish and let LoadBalancer move on during mass-rollback
- }
- try {
- $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
- } catch ( Exception $e ) {
- // already logged; let LoadBalancer move on during mass-rollback
- }
+ // Clear any commit-dependant callbacks. They might even be present
+ // only due to transaction rounds, with no SQL transaction being active
+ $this->trxIdleCallbacks = [];
+ $this->trxPreCommitCallbacks = [];
- $this->affectedRowCount = 0; // for the sake of consistency
+ if ( $trxActive ) {
+ try {
+ $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ } catch ( Exception $e ) {
+ // already logged; finish and let LoadBalancer move on during mass-rollback
+ }
+ try {
+ $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
+ } catch ( Exception $e ) {
+ // already logged; let LoadBalancer move on during mass-rollback
+ }
+
+ $this->affectedRowCount = 0; // for the sake of consistency
+ }
}
/**
$this->tableAliases = $aliases;
}
- /**
- * @return bool Whether a DB user is required to access the DB
- * @since 1.28
- */
- protected function requiresDatabaseUser() {
- return true;
+ public function setIndexAliases( array $aliases ) {
+ $this->indexAliases = $aliases;
}
/**
* This catches broken callers than catch and ignore disconnection exceptions.
* Unlike checking isOpen(), this is safe to call inside of open().
*
- * @return resource|object
+ * @return mixed
* @throws DBUnexpectedError
* @since 1.26
*/