abstract class DatabaseBase implements IDatabase {
/** Number of times to re-try an operation in case of deadlock */
const DEADLOCK_TRIES = 4;
-
/** Minimum time to wait before retry, in microseconds */
const DEADLOCK_DELAY_MIN = 500000;
-
/** Maximum time to wait before retry */
const DEADLOCK_DELAY_MAX = 1500000;
+ /** How long before it is worth doing a dummy query to test the connection */
+ const PING_TTL = 1.0;
+ const PING_QUERY = 'SELECT 1 AS ping';
+
+ const TINY_WRITE_SEC = .010;
+ const SLOW_WRITE_SEC = .500;
+ const SMALL_WRITE_ROWS = 100;
+
+ /** @var string SQL query */
protected $mLastQuery = '';
+ /** @var bool */
protected $mDoneWrites = false;
+ /** @var string|bool */
protected $mPHPError = false;
-
- protected $mServer, $mUser, $mPassword, $mDBname;
+ /** @var string */
+ protected $mServer;
+ /** @var string */
+ protected $mUser;
+ /** @var string */
+ protected $mPassword;
+ /** @var string */
+ protected $mDBname;
+ /** @var bool */
+ protected $cliMode;
/** @var BagOStuff APC cache */
protected $srvCache;
/** @var resource Database connection */
protected $mConn = null;
+ /** @var bool */
protected $mOpened = false;
- /** @var callable[] */
+ /** @var array[] List of (callable, method name) */
protected $mTrxIdleCallbacks = [];
- /** @var callable[] */
+ /** @var array[] List of (callable, method name) */
protected $mTrxPreCommitCallbacks = [];
-
+ /** @var array[] List of (callable, method name) */
+ protected $mTrxEndCallbacks = [];
+ /** @var array[] Map of (name => (callable, method name)) */
+ protected $mTrxRecurringCallbacks = [];
+ /** @var bool Whether to suppress triggering of transaction end callbacks */
+ protected $mTrxEndCallbacksSuppressed = false;
+
+ /** @var string */
protected $mTablePrefix;
+ /** @var string */
protected $mSchema;
+ /** @var integer */
protected $mFlags;
+ /** @var bool */
protected $mForeign;
+ /** @var array */
protected $mLBInfo = [];
+ /** @var bool|null */
protected $mDefaultBigSelects = null;
+ /** @var array|bool */
protected $mSchemaVars = false;
/** @var array */
protected $mSessionVars = [];
-
+ /** @var array|null */
protected $preparedArgs;
-
+ /** @var string|bool|null Stashed value of html_errors INI setting */
protected $htmlErrors;
-
+ /** @var string */
protected $delimiter = ';';
/**
* @var int
*/
protected $mTrxLevel = 0;
-
/**
* Either a short hexidecimal string if a transaction is active or ""
*
* @see DatabaseBase::mTrxLevel
*/
protected $mTrxShortId = '';
-
/**
* The UNIX time that the transaction started. Callers can assume that if
* snapshot isolation is used, then the data is *at least* up to date to that
* @see DatabaseBase::mTrxLevel
*/
private $mTrxTimestamp = null;
-
/** @var float Lag estimate at the time of BEGIN */
- private $mTrxSlaveLag = null;
-
+ private $mTrxReplicaLag = null;
/**
* Remembers the function name given for starting the most recent transaction via begin().
* Used to provide additional context for error reporting.
* @see DatabaseBase::mTrxLevel
*/
private $mTrxFname = null;
-
/**
* Record if possible write queries were done in the last transaction started
*
* @see DatabaseBase::mTrxLevel
*/
private $mTrxDoneWrites = false;
-
/**
* Record if the current transaction was started implicitly due to DBO_TRX being set.
*
* @see DatabaseBase::mTrxLevel
*/
private $mTrxAutomatic = false;
-
/**
* Array of levels of atomicity within transactions
*
* @var array
*/
private $mTrxAtomicLevels = [];
-
/**
* Record if the current transaction was started implicitly by DatabaseBase::startAtomic
*
* @var bool
*/
private $mTrxAutomaticAtomic = false;
-
/**
* Track the write query callers of the current transaction
*
* @var string[]
*/
private $mTrxWriteCallers = [];
-
/**
- * Track the seconds spent in write queries for the current transaction
- *
- * @var float
+ * @var float Seconds spent in write queries for the current transaction
*/
private $mTrxWriteDuration = 0.0;
+ /**
+ * @var integer Number of write queries for the current transaction
+ */
+ private $mTrxWriteQueryCount = 0;
+ /**
+ * @var float Like mTrxWriteQueryCount but excludes lock-bound, easy to replicate, queries
+ */
+ private $mTrxWriteAdjDuration = 0.0;
+ /**
+ * @var integer Number of write queries counted in mTrxWriteAdjDuration
+ */
+ private $mTrxWriteAdjQueryCount = 0;
+ /**
+ * @var float RTT time estimate
+ */
+ private $mRTTEstimate = 0.0;
/** @var array Map of (name => 1) for locks obtained via lock() */
private $mNamedLocksHeld = [];
*/
protected $allViews = null;
+ /** @var float UNIX timestamp */
+ protected $lastPing = 0.0;
+
+ /** @var int[] Prior mFlags values */
+ private $priorFlags = [];
+
+ /** @var Profiler */
+ protected $profiler;
/** @var TransactionProfiler */
protected $trxProfiler;
* @return TransactionProfiler
*/
protected function getTransactionProfiler() {
- if ( !$this->trxProfiler ) {
- $this->trxProfiler = new TransactionProfiler();
- }
-
return $this->trxProfiler;
}
);
}
- public function pendingWriteQueryDuration() {
- return $this->mTrxLevel ? $this->mTrxWriteDuration : false;
+ public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) {
+ if ( !$this->mTrxLevel ) {
+ return false;
+ } elseif ( !$this->mTrxDoneWrites ) {
+ return 0.0;
+ }
+
+ switch ( $type ) {
+ case self::ESTIMATE_DB_APPLY:
+ $this->ping( $rtt );
+ $rttAdjTotal = $this->mTrxWriteAdjQueryCount * $rtt;
+ $applyTime = max( $this->mTrxWriteAdjDuration - $rttAdjTotal, 0 );
+ // For omitted queries, make them count as something at least
+ $omitted = $this->mTrxWriteQueryCount - $this->mTrxWriteAdjQueryCount;
+ $applyTime += self::TINY_WRITE_SEC * $omitted;
+
+ return $applyTime;
+ default: // everything
+ return $this->mTrxWriteDuration;
+ }
}
public function pendingWriteCallers() {
return $this->mOpened;
}
- public function setFlag( $flag ) {
+ public function setFlag( $flag, $remember = self::REMEMBER_NOTHING ) {
+ if ( $remember === self::REMEMBER_PRIOR ) {
+ array_push( $this->priorFlags, $this->mFlags );
+ }
$this->mFlags |= $flag;
}
- public function clearFlag( $flag ) {
+ public function clearFlag( $flag, $remember = self::REMEMBER_NOTHING ) {
+ if ( $remember === self::REMEMBER_PRIOR ) {
+ array_push( $this->priorFlags, $this->mFlags );
+ }
$this->mFlags &= ~$flag;
}
+ public function restoreFlags( $state = self::RESTORE_PRIOR ) {
+ if ( !$this->priorFlags ) {
+ return;
+ }
+
+ if ( $state === self::RESTORE_INITIAL ) {
+ $this->mFlags = reset( $this->priorFlags );
+ $this->priorFlags = [];
+ } else {
+ $this->mFlags = array_pop( $this->priorFlags );
+ }
+ }
+
public function getFlag( $flag ) {
return !!( $this->mFlags & $flag );
}
* @param array $params Parameters passed from DatabaseBase::factory()
*/
function __construct( array $params ) {
- global $wgDBprefix, $wgDBmwschema, $wgCommandLineMode;
+ global $wgDBprefix, $wgDBmwschema;
$this->srvCache = ObjectCache::getLocalServerInstance( 'hash' );
$schema = $params['schema'];
$foreign = $params['foreign'];
+ $this->cliMode = isset( $params['cliMode'] )
+ ? $params['cliMode']
+ : ( PHP_SAPI === 'cli' );
+
$this->mFlags = $flags;
if ( $this->mFlags & DBO_DEFAULT ) {
- if ( $wgCommandLineMode ) {
+ if ( $this->cliMode ) {
$this->mFlags &= ~DBO_TRX;
} else {
$this->mFlags |= DBO_TRX;
$this->mForeign = $foreign;
- if ( isset( $params['trxProfiler'] ) ) {
- $this->trxProfiler = $params['trxProfiler']; // override
- }
+ $this->profiler = isset( $params['profiler'] )
+ ? $params['profiler']
+ : Profiler::instance(); // @TODO: remove global state
+ $this->trxProfiler = isset( $params['trxProfiler'] )
+ ? $params['trxProfiler']
+ : new TransactionProfiler();
if ( $user ) {
$this->open( $server, $user, $password, $dbName );
}
+
}
/**
* @return DatabaseBase|null DatabaseBase subclass or null
*/
final public static function factory( $dbType, $p = [] ) {
+ global $wgCommandLineMode;
+
$canonicalDBTypes = [
'mysql' => [ 'mysqli', 'mysql' ],
'postgres' => [],
$p['schema'] = isset( $defaultSchemas[$dbType] ) ? $defaultSchemas[$dbType] : null;
}
$p['foreign'] = isset( $p['foreign'] ) ? $p['foreign'] : false;
+ $p['cliMode'] = $wgCommandLineMode;
return new $class( $p );
} else {
}
public function close() {
- if ( count( $this->mTrxIdleCallbacks ) ) { // sanity
- throw new MWException( "Transaction idle callbacks still pending." );
- }
if ( $this->mConn ) {
if ( $this->trxLevel() ) {
if ( !$this->mTrxAutomatic ) {
" performing implicit commit before closing connection!" );
}
- $this->commit( __METHOD__, 'flush' );
+ $this->commit( __METHOD__, self::FLUSHING_INTERNAL );
}
$closed = $this->closeConnection();
$this->mConn = false;
+ } elseif ( $this->mTrxIdleCallbacks || $this->mTrxEndCallbacks ) { // sanity
+ throw new MWException( "Transaction callbacks still pending." );
} else {
$closed = true;
}
* @return bool
*/
protected function isWriteQuery( $sql ) {
- return !preg_match( '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SET|SHOW|EXPLAIN|\(SELECT)\b/i', $sql );
+ return !preg_match(
+ '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SET|SHOW|EXPLAIN|\(SELECT)\b/i', $sql );
+ }
+
+ /**
+ * @param $sql
+ * @return string|null
+ */
+ protected function getQueryVerb( $sql ) {
+ return preg_match( '/^\s*([a-z]+)/i', $sql, $m ) ? strtoupper( $m[1] ) : null;
}
/**
* @return bool
*/
protected function isTransactableQuery( $sql ) {
- $verb = substr( $sql, 0, strcspn( $sql, " \t\r\n" ) );
- return !in_array( $verb, [ 'BEGIN', 'COMMIT', 'ROLLBACK', 'SHOW', 'SET' ] );
+ $verb = $this->getQueryVerb( $sql );
+ return !in_array( $verb, [ 'BEGIN', 'COMMIT', 'ROLLBACK', 'SHOW', 'SET' ], true );
}
public function query( $sql, $fname = __METHOD__, $tempIgnore = false ) {
global $wgUser;
+ $priorWritesPending = $this->writesOrCallbacksPending();
$this->mLastQuery = $sql;
- $isWriteQuery = $this->isWriteQuery( $sql );
- if ( $isWriteQuery ) {
+ $isWrite = $this->isWriteQuery( $sql );
+ if ( $isWrite ) {
$reason = $this->getReadOnlyReason();
if ( $reason !== false ) {
throw new DBReadOnlyError( $this, "Database is read-only: $reason" );
// Or, for one-word queries (like "BEGIN" or COMMIT") add it to the end (bug 42598)
$commentedSql = preg_replace( '/\s|$/', " /* $fname $userName */ ", $sql, 1 );
- if ( !$this->mTrxLevel && $this->getFlag( DBO_TRX ) && $this->isTransactableQuery( $sql ) ) {
- $this->begin( __METHOD__ . " ($fname)" );
+ # Start implicit transactions that wrap the request if DBO_TRX is enabled
+ if ( !$this->mTrxLevel && $this->getFlag( DBO_TRX )
+ && $this->isTransactableQuery( $sql )
+ ) {
+ $this->begin( __METHOD__ . " ($fname)", self::TRANSACTION_INTERNAL );
$this->mTrxAutomatic = true;
}
# Keep track of whether the transaction has write queries pending
- if ( $this->mTrxLevel && !$this->mTrxDoneWrites && $isWriteQuery ) {
+ if ( $this->mTrxLevel && !$this->mTrxDoneWrites && $isWrite ) {
$this->mTrxDoneWrites = true;
$this->getTransactionProfiler()->transactionWritingIn(
$this->mServer, $this->mDBname, $this->mTrxShortId );
}
- $isMaster = !is_null( $this->getLBInfo( 'master' ) );
- # generalizeSQL will probably cut down the query to reasonable
- # logging size most of the time. The substr is really just a sanity check.
- if ( $isMaster ) {
- $queryProf = 'query-m: ' . substr( DatabaseBase::generalizeSQL( $sql ), 0, 255 );
- $totalProf = 'DatabaseBase::query-master';
- } else {
- $queryProf = 'query: ' . substr( DatabaseBase::generalizeSQL( $sql ), 0, 255 );
- $totalProf = 'DatabaseBase::query';
- }
- # Include query transaction state
- $queryProf .= $this->mTrxShortId ? " [TRX#{$this->mTrxShortId}]" : "";
-
- $profiler = Profiler::instance();
- if ( !$profiler instanceof ProfilerStub ) {
- $totalProfSection = $profiler->scopedProfileIn( $totalProf );
- $queryProfSection = $profiler->scopedProfileIn( $queryProf );
- }
-
if ( $this->debug() ) {
wfDebugLog( 'queries', sprintf( "%s: %s", $this->mDBname, $commentedSql ) );
}
- $queryId = MWDebug::query( $sql, $fname, $isMaster );
-
# Avoid fatals if close() was called
$this->assertOpen();
- # Do the query and handle errors
- $startTime = microtime( true );
- $ret = $this->doQuery( $commentedSql );
- $queryRuntime = microtime( true ) - $startTime;
- # Log the query time and feed it into the DB trx profiler
- $this->getTransactionProfiler()->recordQueryCompletion(
- $queryProf, $startTime, $isWriteQuery, $this->affectedRows() );
-
- MWDebug::queryTime( $queryId );
+ # Send the query to the server
+ $ret = $this->doProfiledQuery( $sql, $commentedSql, $isWrite, $fname );
# Try reconnecting if the connection was lost
if ( false === $ret && $this->wasErrorReissuable() ) {
- # Transaction is gone; this can mean lost writes or REPEATABLE-READ snapshots
- $hadTrx = $this->mTrxLevel;
- # T127428: for non-write transactions, a disconnect and a COMMIT are similar:
- # neither changed data and in both cases any read snapshots are reset anyway.
- $isNoopCommit = ( !$this->writesOrCallbacksPending() && $sql === 'COMMIT' );
- # Update state tracking to reflect transaction loss
- $this->mTrxLevel = 0;
- $this->mTrxIdleCallbacks = []; // bug 65263
- $this->mTrxPreCommitCallbacks = []; // bug 65263
- wfDebug( "Connection lost, reconnecting...\n" );
- # Stash the last error values since ping() might clear them
+ $recoverable = $this->canRecoverFromDisconnect( $sql, $priorWritesPending );
+ # Stash the last error values before anything might clear them
$lastError = $this->lastError();
$lastErrno = $this->lastErrno();
- if ( $this->ping() ) {
+ # Update state tracking to reflect transaction loss due to disconnection
+ $this->handleTransactionLoss();
+ wfDebug( "Connection lost, reconnecting...\n" );
+ if ( $this->reconnect() ) {
wfDebug( "Reconnected\n" );
- $server = $this->getServer();
- $msg = __METHOD__ . ": lost connection to $server; reconnected";
+ $msg = __METHOD__ . ": lost connection to {$this->getServer()}; reconnected";
wfDebugLog( 'DBPerformance', "$msg:\n" . wfBacktrace( true ) );
- if ( ( $hadTrx && !$isNoopCommit ) || $this->mNamedLocksHeld ) {
- # Leave $ret as false and let an error be reported.
- # Callers may catch the exception and continue to use the DB.
- $this->reportQueryError( $lastError, $lastErrno, $sql, $fname, $tempIgnore );
+ if ( !$recoverable ) {
+ # Callers may catch the exception and continue to use the DB
+ $this->reportQueryError( $lastError, $lastErrno, $sql, $fname );
} else {
- # Should be safe to silently retry (no trx/callbacks/locks)
- $startTime = microtime( true );
- $ret = $this->doQuery( $commentedSql );
- $queryRuntime = microtime( true ) - $startTime;
- # Log the query time and feed it into the DB trx profiler
- $this->getTransactionProfiler()->recordQueryCompletion(
- $queryProf, $startTime, $isWriteQuery, $this->affectedRows() );
+ # Should be safe to silently retry the query
+ $ret = $this->doProfiledQuery( $sql, $commentedSql, $isWrite, $fname );
}
} else {
wfDebug( "Failed\n" );
}
if ( false === $ret ) {
+ # Deadlocks cause the entire transaction to abort, not just the statement.
+ # http://dev.mysql.com/doc/refman/5.7/en/innodb-error-handling.html
+ # https://www.postgresql.org/docs/9.1/static/explicit-locking.html
+ if ( $this->wasDeadlock() ) {
+ if ( $this->explicitTrxActive() || $priorWritesPending ) {
+ $tempIgnore = false; // not recoverable
+ }
+ # Update state tracking to reflect transaction loss
+ $this->handleTransactionLoss();
+ }
+
$this->reportQueryError(
$this->lastError(), $this->lastErrno(), $sql, $fname, $tempIgnore );
}
$res = $this->resultObject( $ret );
- // Destroy profile sections in the opposite order to their creation
- ScopedCallback::consume( $queryProfSection );
- ScopedCallback::consume( $totalProfSection );
+ return $res;
+ }
- if ( $isWriteQuery && $this->mTrxLevel ) {
- $this->mTrxWriteDuration += $queryRuntime;
- $this->mTrxWriteCallers[] = $fname;
+ private function doProfiledQuery( $sql, $commentedSql, $isWrite, $fname ) {
+ $isMaster = !is_null( $this->getLBInfo( 'master' ) );
+ # generalizeSQL() will probably cut down the query to reasonable
+ # logging size most of the time. The substr is really just a sanity check.
+ if ( $isMaster ) {
+ $queryProf = 'query-m: ' . substr( DatabaseBase::generalizeSQL( $sql ), 0, 255 );
+ } else {
+ $queryProf = 'query: ' . substr( DatabaseBase::generalizeSQL( $sql ), 0, 255 );
}
- return $res;
+ # Include query transaction state
+ $queryProf .= $this->mTrxShortId ? " [TRX#{$this->mTrxShortId}]" : "";
+
+ $startTime = microtime( true );
+ $this->profiler->profileIn( $queryProf );
+ $ret = $this->doQuery( $commentedSql );
+ $this->profiler->profileOut( $queryProf );
+ $queryRuntime = max( microtime( true ) - $startTime, 0.0 );
+
+ unset( $queryProfSection ); // profile out (if set)
+
+ if ( $ret !== false ) {
+ $this->lastPing = $startTime;
+ if ( $isWrite && $this->mTrxLevel ) {
+ $this->updateTrxWriteQueryTime( $sql, $queryRuntime );
+ $this->mTrxWriteCallers[] = $fname;
+ }
+ }
+
+ if ( $sql === self::PING_QUERY ) {
+ $this->mRTTEstimate = $queryRuntime;
+ }
+
+ $this->getTransactionProfiler()->recordQueryCompletion(
+ $queryProf, $startTime, $isWrite, $this->affectedRows()
+ );
+ MWDebug::query( $sql, $fname, $isMaster, $queryRuntime );
+
+ return $ret;
+ }
+
+ /**
+ * Update the estimated run-time of a query, not counting large row lock times
+ *
+ * LoadBalancer can be set to rollback transactions that will create huge replication
+ * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple
+ * queries, like inserting a row can take a long time due to row locking. This method
+ * uses some simple heuristics to discount those cases.
+ *
+ * @param string $sql A SQL write query
+ * @param float $runtime Total runtime, including RTT
+ */
+ private function updateTrxWriteQueryTime( $sql, $runtime ) {
+ // Whether this is indicative of replica DB runtime (except for RBR or ws_repl)
+ $indicativeOfReplicaRuntime = true;
+ if ( $runtime > self::SLOW_WRITE_SEC ) {
+ $verb = $this->getQueryVerb( $sql );
+ // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks
+ if ( $verb === 'INSERT' ) {
+ $indicativeOfReplicaRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS;
+ } elseif ( $verb === 'REPLACE' ) {
+ $indicativeOfReplicaRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS / 2;
+ }
+ }
+
+ $this->mTrxWriteDuration += $runtime;
+ $this->mTrxWriteQueryCount += 1;
+ if ( $indicativeOfReplicaRuntime ) {
+ $this->mTrxWriteAdjDuration += $runtime;
+ $this->mTrxWriteAdjQueryCount += 1;
+ }
+ }
+
+ private function canRecoverFromDisconnect( $sql, $priorWritesPending ) {
+ # Transaction dropped; this can mean lost writes, or REPEATABLE-READ snapshots.
+ # Dropped connections also mean that named locks are automatically released.
+ # Only allow error suppression in autocommit mode or when the lost transaction
+ # didn't matter anyway (aside from DBO_TRX snapshot loss).
+ if ( $this->mNamedLocksHeld ) {
+ return false; // possible critical section violation
+ } elseif ( $sql === 'COMMIT' ) {
+ return !$priorWritesPending; // nothing written anyway? (T127428)
+ } elseif ( $sql === 'ROLLBACK' ) {
+ return true; // transaction lost...which is also what was requested :)
+ } elseif ( $this->explicitTrxActive() ) {
+ return false; // don't drop atomocity
+ } elseif ( $priorWritesPending ) {
+ return false; // prior writes lost from implicit transaction
+ }
+
+ return true;
+ }
+
+ private function handleTransactionLoss() {
+ $this->mTrxLevel = 0;
+ $this->mTrxIdleCallbacks = []; // bug 65263
+ $this->mTrxPreCommitCallbacks = []; // bug 65263
+ try {
+ // Handle callbacks in mTrxEndCallbacks
+ $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
+ return null;
+ } catch ( Exception $e ) {
+ // Already logged; move on...
+ return $e;
+ }
}
public function reportQueryError( $error, $errno, $sql, $fname, $tempIgnore = false ) {
} else {
$useIndex = '';
}
+ if ( isset( $options['IGNORE INDEX'] ) && is_string( $options['IGNORE INDEX'] ) ) {
+ $ignoreIndex = $this->ignoreIndexClause( $options['IGNORE INDEX'] );
+ } else {
+ $ignoreIndex = '';
+ }
- return [ $startOpts, $useIndex, $preLimitTail, $postLimitTail ];
+ return [ $startOpts, $useIndex, $preLimitTail, $postLimitTail, $ignoreIndex ];
}
/**
$useIndexes = ( isset( $options['USE INDEX'] ) && is_array( $options['USE INDEX'] ) )
? $options['USE INDEX']
: [];
+ $ignoreIndexes = ( isset( $options['IGNORE INDEX'] ) && is_array( $options['IGNORE INDEX'] ) )
+ ? $options['IGNORE INDEX']
+ : [];
if ( is_array( $table ) ) {
$from = ' FROM ' .
- $this->tableNamesWithUseIndexOrJOIN( $table, $useIndexes, $join_conds );
+ $this->tableNamesWithIndexClauseOrJOIN( $table, $useIndexes, $ignoreIndexes, $join_conds );
} elseif ( $table != '' ) {
if ( $table[0] == ' ' ) {
$from = ' FROM ' . $table;
} else {
$from = ' FROM ' .
- $this->tableNamesWithUseIndexOrJOIN( [ $table ], $useIndexes, [] );
+ $this->tableNamesWithIndexClauseOrJOIN( [ $table ], $useIndexes, $ignoreIndexes, [] );
}
} else {
$from = '';
}
- list( $startOpts, $useIndex, $preLimitTail, $postLimitTail ) =
+ list( $startOpts, $useIndex, $preLimitTail, $postLimitTail, $ignoreIndex ) =
$this->makeSelectOptions( $options );
if ( !empty( $conds ) ) {
if ( is_array( $conds ) ) {
$conds = $this->makeList( $conds, LIST_AND );
}
- $sql = "SELECT $startOpts $vars $from $useIndex WHERE $conds $preLimitTail";
+ $sql = "SELECT $startOpts $vars $from $useIndex $ignoreIndex WHERE $conds $preLimitTail";
} else {
- $sql = "SELECT $startOpts $vars $from $useIndex $preLimitTail";
+ $sql = "SELECT $startOpts $vars $from $useIndex $ignoreIndex $preLimitTail";
}
if ( isset( $options['LIMIT'] ) ) {
return '(' . $this->selectSQLText( $table, $fld, $conds, null, [], $join_conds ) . ')';
}
+ /**
+ * @param string $field Field or column to cast
+ * @return string
+ * @since 1.28
+ */
+ public function buildStringCast( $field ) {
+ return $field;
+ }
+
public function selectDB( $db ) {
# Stub. Shouldn't cause serious problems if it's not overridden, but
# if your database engine supports a concept similar to MySQL's
/**
* Gets an array of aliased table names
*
- * @param array $tables Array( [alias] => table )
+ * @param array $tables [ [alias] => table ]
* @return string[] See tableNameWithAlias()
*/
public function tableNamesWithAlias( $tables ) {
/**
* Gets an array of aliased field names
*
- * @param array $fields Array( [alias] => field )
+ * @param array $fields [ [alias] => field ]
* @return string[] See fieldNameWithAlias()
*/
public function fieldNamesWithAlias( $fields ) {
/**
* Get the aliased table name clause for a FROM clause
- * which might have a JOIN and/or USE INDEX clause
+ * which might have a JOIN and/or USE INDEX or IGNORE INDEX clause
*
* @param array $tables ( [alias] => table )
* @param array $use_index Same as for select()
+ * @param array $ignore_index Same as for select()
* @param array $join_conds Same as for select()
* @return string
*/
- protected function tableNamesWithUseIndexOrJOIN(
- $tables, $use_index = [], $join_conds = []
+ protected function tableNamesWithIndexClauseOrJOIN(
+ $tables, $use_index = [], $ignore_index = [], $join_conds = []
) {
$ret = [];
$retJOIN = [];
$use_index = (array)$use_index;
+ $ignore_index = (array)$ignore_index;
$join_conds = (array)$join_conds;
foreach ( $tables as $alias => $table ) {
$tableClause .= ' ' . $use;
}
}
+ if ( isset( $ignore_index[$alias] ) ) { // has IGNORE INDEX?
+ $ignore = $this->ignoreIndexClause( implode( ',', (array)$ignore_index[$alias] ) );
+ if ( $ignore != '' ) {
+ $tableClause .= ' ' . $ignore;
+ }
+ }
$on = $this->makeList( (array)$conds, LIST_AND );
if ( $on != '' ) {
$tableClause .= ' ON (' . $on . ')';
implode( ',', (array)$use_index[$alias] )
);
+ $ret[] = $tableClause;
+ } elseif ( isset( $ignore_index[$alias] ) ) {
+ // Is there an INDEX clause for this table?
+ $tableClause = $this->tableNameWithAlias( $table, $alias );
+ $tableClause .= ' ' . $this->ignoreIndexClause(
+ implode( ',', (array)$ignore_index[$alias] )
+ );
+
$ret[] = $tableClause;
} else {
$tableClause = $this->tableNameWithAlias( $table, $alias );
return '';
}
+ /**
+ * IGNORE INDEX clause. Unlikely to be useful for anything but MySQL. This
+ * is only needed because a) MySQL must be as efficient as possible due to
+ * its use on Wikipedia, and b) MySQL 4.0 is kind of dumb sometimes about
+ * which index to pick. Anyway, other databases might have different
+ * indexes on a given table. So don't bother overriding this unless you're
+ * MySQL.
+ * @param string $index
+ * @return string
+ */
+ public function ignoreIndexClause( $index ) {
+ return '';
+ }
+
public function replace( $table, $uniqueIndexes, $rows, $fname = __METHOD__ ) {
$quotedTable = $this->tableName( $table );
$useTrx = !$this->mTrxLevel;
if ( $useTrx ) {
- $this->begin( $fname );
+ $this->begin( $fname, self::TRANSACTION_INTERNAL );
}
try {
# Update any existing conflicting row(s)
$ok = $this->insert( $table, $rows, $fname, [ 'IGNORE' ] ) && $ok;
} catch ( Exception $e ) {
if ( $useTrx ) {
- $this->rollback( $fname );
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
}
throw $e;
}
if ( $useTrx ) {
- $this->commit( $fname );
+ $this->commit( $fname, self::FLUSHING_INTERNAL );
}
return $ok;
return $this->query( $sql, $fname );
}
- public function insertSelect( $destTable, $srcTable, $varMap, $conds,
+ public function insertSelect(
+ $destTable, $srcTable, $varMap, $conds,
+ $fname = __METHOD__, $insertOptions = [], $selectOptions = []
+ ) {
+ if ( $this->cliMode ) {
+ // For massive migrations with downtime, we don't want to select everything
+ // into memory and OOM, so do all this native on the server side if possible.
+ return $this->nativeInsertSelect(
+ $destTable,
+ $srcTable,
+ $varMap,
+ $conds,
+ $fname,
+ $insertOptions,
+ $selectOptions
+ );
+ }
+
+ // For web requests, do a locking SELECT and then INSERT. This puts the SELECT burden
+ // on only the master (without needing row-based-replication). It also makes it easy to
+ // know how big the INSERT is going to be.
+ $fields = [];
+ foreach ( $varMap as $dstColumn => $sourceColumnOrSql ) {
+ $fields[] = $this->fieldNameWithAlias( $sourceColumnOrSql, $dstColumn );
+ }
+ $selectOptions[] = 'FOR UPDATE';
+ $res = $this->select( $srcTable, implode( ',', $fields ), $conds, $fname, $selectOptions );
+ if ( !$res ) {
+ return false;
+ }
+
+ $rows = [];
+ foreach ( $res as $row ) {
+ $rows[] = (array)$row;
+ }
+
+ return $this->insert( $destTable, $rows, $fname, $insertOptions );
+ }
+
+ public function nativeInsertSelect( $destTable, $srcTable, $varMap, $conds,
$fname = __METHOD__,
$insertOptions = [], $selectOptions = []
) {
$selectOptions = [ $selectOptions ];
}
- list( $startOpts, $useIndex, $tailOpts ) = $this->makeSelectOptions( $selectOptions );
+ list( $startOpts, $useIndex, $tailOpts, $ignoreIndex ) = $this->makeSelectOptions(
+ $selectOptions );
if ( is_array( $srcTable ) ) {
$srcTable = implode( ',', array_map( [ &$this, 'tableName' ], $srcTable ) );
$sql = "INSERT $insertOptions INTO $destTable (" . implode( ',', array_keys( $varMap ) ) . ')' .
" SELECT $startOpts " . implode( ',', $varMap ) .
- " FROM $srcTable $useIndex ";
+ " FROM $srcTable $useIndex $ignoreIndex ";
if ( $conds != '*' ) {
if ( is_array( $conds ) ) {
* queries. If a deadlock occurs during the processing, the transaction
* will be rolled back and the callback function will be called again.
*
+ * Avoid using this method outside of Job or Maintenance classes.
+ *
* Usage:
* $dbw->deadlockLoop( callback, ... );
*
* Extra arguments are passed through to the specified callback function.
+ * This method requires that no transactions are already active to avoid
+ * causing premature commits or exceptions.
*
* Returns whatever the callback function returned on its successful,
* iteration, or false on error, for example if the retry limit was
* reached.
+ *
* @return mixed
* @throws DBUnexpectedError
* @throws Exception
return false;
}
- final public function onTransactionIdle( $callback ) {
+ public function serverIsReadOnly() {
+ return false;
+ }
+
+ final public function onTransactionResolution( callable $callback ) {
+ if ( !$this->mTrxLevel ) {
+ throw new DBUnexpectedError( $this, "No transaction is active." );
+ }
+ $this->mTrxEndCallbacks[] = [ $callback, wfGetCaller() ];
+ }
+
+ final public function onTransactionIdle( callable $callback ) {
$this->mTrxIdleCallbacks[] = [ $callback, wfGetCaller() ];
if ( !$this->mTrxLevel ) {
- $this->runOnTransactionIdleCallbacks();
+ $this->runOnTransactionIdleCallbacks( self::TRIGGER_IDLE );
}
}
- final public function onTransactionPreCommitOrIdle( $callback ) {
+ final public function onTransactionPreCommitOrIdle( callable $callback ) {
if ( $this->mTrxLevel ) {
$this->mTrxPreCommitCallbacks[] = [ $callback, wfGetCaller() ];
} else {
- $this->onTransactionIdle( $callback ); // this will trigger immediately
+ // If no transaction is active, then make one for this callback
+ $this->startAtomic( __METHOD__ );
+ try {
+ call_user_func( $callback );
+ $this->endAtomic( __METHOD__ );
+ } catch ( Exception $e ) {
+ $this->rollback( __METHOD__, self::FLUSHING_INTERNAL );
+ throw $e;
+ }
+ }
+ }
+
+ final public function setTransactionListener( $name, callable $callback = null ) {
+ if ( $callback ) {
+ $this->mTrxRecurringCallbacks[$name] = [ $callback, wfGetCaller() ];
+ } else {
+ unset( $this->mTrxRecurringCallbacks[$name] );
}
}
/**
- * Actually any "on transaction idle" callbacks.
+ * Whether to disable running of post-COMMIT/ROLLBACK callbacks
*
+ * This method should not be used outside of Database/LoadBalancer
+ *
+ * @param bool $suppress
+ * @since 1.28
+ */
+ final public function setTrxEndCallbackSuppression( $suppress ) {
+ $this->mTrxEndCallbacksSuppressed = $suppress;
+ }
+
+ /**
+ * Actually run and consume any "on transaction idle/resolution" callbacks.
+ *
+ * This method should not be used outside of Database/LoadBalancer
+ *
+ * @param integer $trigger IDatabase::TRIGGER_* constant
* @since 1.20
+ * @throws Exception
*/
- protected function runOnTransactionIdleCallbacks() {
- $autoTrx = $this->getFlag( DBO_TRX ); // automatic begin() enabled?
+ public function runOnTransactionIdleCallbacks( $trigger ) {
+ if ( $this->mTrxEndCallbacksSuppressed ) {
+ return;
+ }
- $e = $ePrior = null; // last exception
+ $autoTrx = $this->getFlag( DBO_TRX ); // automatic begin() enabled?
+ /** @var Exception $e */
+ $e = null; // first exception
do { // callbacks may add callbacks :)
- $callbacks = $this->mTrxIdleCallbacks;
- $this->mTrxIdleCallbacks = []; // recursion guard
+ $callbacks = array_merge(
+ $this->mTrxIdleCallbacks,
+ $this->mTrxEndCallbacks // include "transaction resolution" callbacks
+ );
+ $this->mTrxIdleCallbacks = []; // consumed (and recursion guard)
+ $this->mTrxEndCallbacks = []; // consumed (recursion guard)
foreach ( $callbacks as $callback ) {
try {
list( $phpCallback ) = $callback;
$this->clearFlag( DBO_TRX ); // make each query its own transaction
- call_user_func( $phpCallback );
+ call_user_func_array( $phpCallback, [ $trigger ] );
if ( $autoTrx ) {
$this->setFlag( DBO_TRX ); // restore automatic begin()
} else {
$this->clearFlag( DBO_TRX ); // restore auto-commit
}
- } catch ( Exception $e ) {
- if ( $ePrior ) {
- MWExceptionHandler::logException( $ePrior );
- }
- $ePrior = $e;
+ } catch ( Exception $ex ) {
+ MWExceptionHandler::logException( $ex );
+ $e = $e ?: $ex;
// Some callbacks may use startAtomic/endAtomic, so make sure
// their transactions are ended so other callbacks don't fail
if ( $this->trxLevel() ) {
- $this->rollback( __METHOD__ );
+ $this->rollback( __METHOD__, self::FLUSHING_INTERNAL );
}
}
}
} while ( count( $this->mTrxIdleCallbacks ) );
if ( $e instanceof Exception ) {
- throw $e; // re-throw any last exception
+ throw $e; // re-throw any first exception
}
}
/**
- * Actually any "on transaction pre-commit" callbacks.
+ * Actually run and consume any "on transaction pre-commit" callbacks.
+ *
+ * This method should not be used outside of Database/LoadBalancer
*
* @since 1.22
+ * @throws Exception
*/
- protected function runOnTransactionPreCommitCallbacks() {
- $e = $ePrior = null; // last exception
+ public function runOnTransactionPreCommitCallbacks() {
+ $e = null; // first exception
do { // callbacks may add callbacks :)
$callbacks = $this->mTrxPreCommitCallbacks;
- $this->mTrxPreCommitCallbacks = []; // recursion guard
+ $this->mTrxPreCommitCallbacks = []; // consumed (and recursion guard)
foreach ( $callbacks as $callback ) {
try {
list( $phpCallback ) = $callback;
call_user_func( $phpCallback );
- } catch ( Exception $e ) {
- if ( $ePrior ) {
- MWExceptionHandler::logException( $ePrior );
- }
- $ePrior = $e;
+ } catch ( Exception $ex ) {
+ MWExceptionHandler::logException( $ex );
+ $e = $e ?: $ex;
}
}
} while ( count( $this->mTrxPreCommitCallbacks ) );
if ( $e instanceof Exception ) {
- throw $e; // re-throw any last exception
+ throw $e; // re-throw any first exception
+ }
+ }
+
+ /**
+ * Actually run any "transaction listener" callbacks.
+ *
+ * This method should not be used outside of Database/LoadBalancer
+ *
+ * @param integer $trigger IDatabase::TRIGGER_* constant
+ * @throws Exception
+ * @since 1.20
+ */
+ public function runTransactionListenerCallbacks( $trigger ) {
+ if ( $this->mTrxEndCallbacksSuppressed ) {
+ return;
+ }
+
+ /** @var Exception $e */
+ $e = null; // first exception
+
+ foreach ( $this->mTrxRecurringCallbacks as $callback ) {
+ try {
+ list( $phpCallback ) = $callback;
+ $phpCallback( $trigger, $this );
+ } catch ( Exception $ex ) {
+ MWExceptionHandler::logException( $ex );
+ $e = $e ?: $ex;
+ }
+ }
+
+ if ( $e instanceof Exception ) {
+ throw $e; // re-throw any first exception
}
}
final public function startAtomic( $fname = __METHOD__ ) {
if ( !$this->mTrxLevel ) {
- $this->begin( $fname );
- $this->mTrxAutomatic = true;
+ $this->begin( $fname, self::TRANSACTION_INTERNAL );
// If DBO_TRX is set, a series of startAtomic/endAtomic pairs will result
// in all changes being in one transaction to keep requests transactional.
if ( !$this->getFlag( DBO_TRX ) ) {
final public function endAtomic( $fname = __METHOD__ ) {
if ( !$this->mTrxLevel ) {
- throw new DBUnexpectedError( $this, 'No atomic transaction is open.' );
+ throw new DBUnexpectedError( $this, "No atomic transaction is open (got $fname)." );
}
if ( !$this->mTrxAtomicLevels ||
array_pop( $this->mTrxAtomicLevels ) !== $fname
) {
- throw new DBUnexpectedError( $this, 'Invalid atomic section ended.' );
+ throw new DBUnexpectedError( $this, "Invalid atomic section ended (got $fname)." );
}
if ( !$this->mTrxAtomicLevels && $this->mTrxAutomaticAtomic ) {
- $this->commit( $fname, 'flush' );
+ $this->commit( $fname, self::FLUSHING_INTERNAL );
}
}
final public function doAtomicSection( $fname, callable $callback ) {
$this->startAtomic( $fname );
try {
- call_user_func_array( $callback, [ $this, $fname ] );
+ $res = call_user_func_array( $callback, [ $this, $fname ] );
} catch ( Exception $e ) {
- $this->rollback( $fname );
+ $this->rollback( $fname, self::FLUSHING_INTERNAL );
throw $e;
}
$this->endAtomic( $fname );
+
+ return $res;
}
- final public function begin( $fname = __METHOD__ ) {
- if ( $this->mTrxLevel ) { // implicit commit
+ final public function begin( $fname = __METHOD__, $mode = self::TRANSACTION_EXPLICIT ) {
+ // Protect against mismatched atomic section, transaction nesting, and snapshot loss
+ if ( $this->mTrxLevel ) {
if ( $this->mTrxAtomicLevels ) {
- // If the current transaction was an automatic atomic one, then we definitely have
- // a problem. Same if there is any unclosed atomic level.
$levels = implode( ', ', $this->mTrxAtomicLevels );
- throw new DBUnexpectedError(
- $this,
- "Got explicit BEGIN from $fname while atomic section(s) $levels are open."
- );
+ $msg = "$fname: Got explicit BEGIN while atomic section(s) $levels are open.";
+ throw new DBUnexpectedError( $this, $msg );
} elseif ( !$this->mTrxAutomatic ) {
- // We want to warn about inadvertently nested begin/commit pairs, but not about
- // auto-committing implicit transactions that were started by query() via DBO_TRX
- throw new DBUnexpectedError(
- $this,
- "$fname: Transaction already in progress (from {$this->mTrxFname}), " .
- " performing implicit commit!"
- );
+ $msg = "$fname: Explicit transaction already active (from {$this->mTrxFname}).";
+ throw new DBUnexpectedError( $this, $msg );
} else {
- // The transaction was automatic and has done write operations
- if ( $this->mTrxDoneWrites ) {
- wfDebug( "$fname: Automatic transaction with writes in progress" .
- " (from {$this->mTrxFname}), performing implicit commit!\n"
- );
- }
- }
-
- $this->runOnTransactionPreCommitCallbacks();
- $writeTime = $this->pendingWriteQueryDuration();
- $this->doCommit( $fname );
- if ( $this->mTrxDoneWrites ) {
- $this->mDoneWrites = microtime( true );
- $this->getTransactionProfiler()->transactionWritingOut(
- $this->mServer, $this->mDBname, $this->mTrxShortId, $writeTime );
+ // @TODO: make this an exception at some point
+ $msg = "$fname: Implicit transaction already active (from {$this->mTrxFname}).";
+ wfLogDBError( $msg );
+ wfWarn( $msg );
+ return; // join the main transaction set
}
- $this->runOnTransactionIdleCallbacks();
+ } elseif ( $this->getFlag( DBO_TRX ) && $mode !== self::TRANSACTION_INTERNAL ) {
+ // @TODO: make this an exception at some point
+ $msg = "$fname: Implicit transaction expected (DBO_TRX set).";
+ wfLogDBError( $msg );
+ wfWarn( $msg );
+ return; // let any writes be in the main transaction
}
- # Avoid fatals if close() was called
+ // Avoid fatals if close() was called
$this->assertOpen();
$this->doBegin( $fname );
$this->mTrxTimestamp = microtime( true );
$this->mTrxFname = $fname;
$this->mTrxDoneWrites = false;
- $this->mTrxAutomatic = false;
+ $this->mTrxAutomatic = ( $mode === self::TRANSACTION_INTERNAL );
$this->mTrxAutomaticAtomic = false;
$this->mTrxAtomicLevels = [];
- $this->mTrxIdleCallbacks = [];
- $this->mTrxPreCommitCallbacks = [];
$this->mTrxShortId = wfRandomString( 12 );
$this->mTrxWriteDuration = 0.0;
+ $this->mTrxWriteQueryCount = 0;
+ $this->mTrxWriteAdjDuration = 0.0;
+ $this->mTrxWriteAdjQueryCount = 0;
$this->mTrxWriteCallers = [];
// First SELECT after BEGIN will establish the snapshot in REPEATABLE-READ.
- // Get an estimate of the slave lag before then, treating estimate staleness
+ // Get an estimate of the replica DB lag before then, treating estimate staleness
// as lag itself just to be safe
$status = $this->getApproximateLagStatus();
- $this->mTrxSlaveLag = $status['lag'] + ( microtime( true ) - $status['since'] );
+ $this->mTrxReplicaLag = $status['lag'] + ( microtime( true ) - $status['since'] );
}
/**
$levels = implode( ', ', $this->mTrxAtomicLevels );
throw new DBUnexpectedError(
$this,
- "Got COMMIT while atomic sections $levels are still open"
+ "$fname: Got COMMIT while atomic sections $levels are still open."
);
}
- if ( $flush === 'flush' ) {
+ if ( $flush === self::FLUSHING_INTERNAL || $flush === self::FLUSHING_ALL_PEERS ) {
if ( !$this->mTrxLevel ) {
return; // nothing to do
} elseif ( !$this->mTrxAutomatic ) {
throw new DBUnexpectedError(
$this,
- "$fname: Flushing an explicit transaction, getting out of sync!"
+ "$fname: Flushing an explicit transaction, getting out of sync."
);
}
} else {
if ( !$this->mTrxLevel ) {
- wfWarn( "$fname: No transaction to commit, something got out of sync!" );
+ wfWarn( "$fname: No transaction to commit, something got out of sync." );
return; // nothing to do
} elseif ( $this->mTrxAutomatic ) {
- wfWarn( "$fname: Explicit commit of implicit transaction. Something may be out of sync!" );
+ // @TODO: make this an exception at some point
+ $msg = "$fname: Explicit commit of implicit transaction.";
+ wfLogDBError( $msg );
+ wfWarn( $msg );
+ return; // wait for the main transaction set commit round
}
}
- # Avoid fatals if close() was called
+ // Avoid fatals if close() was called
$this->assertOpen();
$this->runOnTransactionPreCommitCallbacks();
- $writeTime = $this->pendingWriteQueryDuration();
+ $writeTime = $this->pendingWriteQueryDuration( self::ESTIMATE_DB_APPLY );
$this->doCommit( $fname );
if ( $this->mTrxDoneWrites ) {
$this->mDoneWrites = microtime( true );
$this->getTransactionProfiler()->transactionWritingOut(
$this->mServer, $this->mDBname, $this->mTrxShortId, $writeTime );
}
- $this->runOnTransactionIdleCallbacks();
+
+ $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT );
+ $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT );
}
/**
}
final public function rollback( $fname = __METHOD__, $flush = '' ) {
- if ( $flush !== 'flush' ) {
+ if ( $flush === self::FLUSHING_INTERNAL || $flush === self::FLUSHING_ALL_PEERS ) {
if ( !$this->mTrxLevel ) {
- wfWarn( "$fname: No transaction to rollback, something got out of sync!" );
return; // nothing to do
}
} else {
if ( !$this->mTrxLevel ) {
+ wfWarn( "$fname: No transaction to rollback, something got out of sync." );
return; // nothing to do
+ } elseif ( $this->getFlag( DBO_TRX ) ) {
+ throw new DBUnexpectedError(
+ $this,
+ "$fname: Expected mass rollback of all peer databases (DBO_TRX set)."
+ );
}
}
- # Avoid fatals if close() was called
+ // Avoid fatals if close() was called
$this->assertOpen();
$this->doRollback( $fname );
- $this->mTrxIdleCallbacks = []; // cancel
- $this->mTrxPreCommitCallbacks = []; // cancel
$this->mTrxAtomicLevels = [];
if ( $this->mTrxDoneWrites ) {
$this->getTransactionProfiler()->transactionWritingOut(
$this->mServer, $this->mDBname, $this->mTrxShortId );
}
+
+ $this->mTrxIdleCallbacks = []; // clear
+ $this->mTrxPreCommitCallbacks = []; // clear
+ $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK );
+ $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK );
}
/**
*/
protected function doRollback( $fname ) {
if ( $this->mTrxLevel ) {
- $this->query( 'ROLLBACK', $fname, true );
+ # Disconnects cause rollback anyway, so ignore those errors
+ $ignoreErrors = true;
+ $this->query( 'ROLLBACK', $fname, $ignoreErrors );
$this->mTrxLevel = 0;
}
}
+ public function flushSnapshot( $fname = __METHOD__ ) {
+ if ( $this->writesOrCallbacksPending() || $this->explicitTrxActive() ) {
+ // This only flushes transactions to clear snapshots, not to write data
+ throw new DBUnexpectedError(
+ $this,
+ "$fname: Cannot COMMIT to clear snapshot because writes are pending."
+ );
+ }
+
+ $this->commit( $fname, self::FLUSHING_INTERNAL );
+ }
+
+ public function explicitTrxActive() {
+ return $this->mTrxLevel && ( $this->mTrxAtomicLevels || !$this->mTrxAutomatic );
+ }
+
/**
* Creates a new table with structure copied from existing table
* Note that unlike most database abstraction functions, this function does not
}
}
- public function ping() {
- # Stub. Not essential to override.
- return true;
+ public function ping( &$rtt = null ) {
+ // Avoid hitting the server if it was hit recently
+ if ( $this->isOpen() && ( microtime( true ) - $this->lastPing ) < self::PING_TTL ) {
+ if ( !func_num_args() || $this->mRTTEstimate > 0 ) {
+ $rtt = $this->mRTTEstimate;
+ return true; // don't care about $rtt
+ }
+ }
+
+ // This will reconnect if possible or return false if not
+ $this->clearFlag( DBO_TRX, self::REMEMBER_PRIOR );
+ $ok = ( $this->query( self::PING_QUERY, __METHOD__, true ) !== false );
+ $this->restoreFlags( self::RESTORE_PRIOR );
+
+ if ( $ok ) {
+ $rtt = $this->mRTTEstimate;
+ }
+
+ return $ok;
+ }
+
+ /**
+ * @return bool
+ */
+ protected function reconnect() {
+ $this->closeConnection();
+ $this->mOpened = false;
+ $this->mConn = false;
+ try {
+ $this->open( $this->mServer, $this->mUser, $this->mPassword, $this->mDBname );
+ $this->lastPing = microtime( true );
+ $ok = true;
+ } catch ( DBConnectionError $e ) {
+ $ok = false;
+ }
+
+ return $ok;
}
public function getSessionLagStatus() {
}
/**
- * Get the slave lag when the current transaction started
+ * Get the replica DB lag when the current transaction started
*
* This is useful when transactions might use snapshot isolation
* (e.g. REPEATABLE-READ in innodb), so the "real" lag of that data
*/
public function getTransactionLagStatus() {
return $this->mTrxLevel
- ? [ 'lag' => $this->mTrxSlaveLag, 'since' => $this->trxTimestamp() ]
+ ? [ 'lag' => $this->mTrxReplicaLag, 'since' => $this->trxTimestamp() ]
: null;
}
/**
- * Get a slave lag estimate for this server
+ * Get a replica DB lag estimate for this server
*
* @return array ('lag': seconds or false on error, 'since': UNIX timestamp of estimate)
* @since 1.27
*/
public function getApproximateLagStatus() {
return [
- 'lag' => $this->getLBInfo( 'slave' ) ? $this->getLag() : 0,
+ 'lag' => $this->getLBInfo( 'replica' ) ? $this->getLag() : 0,
'since' => microtime( true )
];
}
}
public function getScopedLockAndFlush( $lockKey, $fname, $timeout ) {
+ if ( $this->writesOrCallbacksPending() ) {
+ // This only flushes transactions to clear snapshots, not to write data
+ throw new DBUnexpectedError(
+ $this,
+ "$fname: Cannot COMMIT to clear snapshot because writes are pending."
+ );
+ }
+
if ( !$this->lock( $lockKey, $fname, $timeout ) ) {
return null;
}
$unlocker = new ScopedCallback( function () use ( $lockKey, $fname ) {
- $this->commit( __METHOD__, 'flush' );
- $this->unlock( $lockKey, $fname );
+ if ( $this->trxLevel() ) {
+ // There is a good chance an exception was thrown, causing any early return
+ // from the caller. Let any error handler get a chance to issue rollback().
+ // If there isn't one, let the error bubble up and trigger server-side rollback.
+ $this->onTransactionResolution( function () use ( $lockKey, $fname ) {
+ $this->unlock( $lockKey, $fname );
+ } );
+ } else {
+ $this->unlock( $lockKey, $fname );
+ }
} );
- $this->commit( __METHOD__, 'flush' );
+ $this->commit( __METHOD__, self::FLUSHING_INTERNAL );
return $unlocker;
}
if ( $this->mTrxLevel && $this->mTrxDoneWrites ) {
trigger_error( "Uncommitted DB writes (transaction from {$this->mTrxFname})." );
}
- if ( count( $this->mTrxIdleCallbacks ) || count( $this->mTrxPreCommitCallbacks ) ) {
+ $danglingCallbacks = array_merge(
+ $this->mTrxIdleCallbacks,
+ $this->mTrxPreCommitCallbacks,
+ $this->mTrxEndCallbacks
+ );
+ if ( $danglingCallbacks ) {
$callers = [];
- foreach ( $this->mTrxIdleCallbacks as $callbackInfo ) {
+ foreach ( $danglingCallbacks as $callbackInfo ) {
$callers[] = $callbackInfo[1];
}
$callers = implode( ', ', $callers );
*/
abstract class DatabaseMysqlBase extends Database {
/** @var MysqlMasterPos */
- protected $lastKnownSlavePos;
- /** @var string Method to detect slave lag */
+ protected $lastKnownReplicaPos;
+ /** @var string Method to detect replica DB lag */
protected $lagDetectionMethod;
- /** @var array Method to detect slave lag */
+ /** @var array Method to detect replica DB lag */
protected $lagDetectionOptions = [];
-
+ /** @var bool bool Whether to use GTID methods */
+ protected $useGTIDs = false;
+ /** @var string|null */
+ protected $sslKeyPath;
+ /** @var string|null */
+ protected $sslCertPath;
+ /** @var string|null */
+ protected $sslCAPath;
+ /** @var string[]|null */
+ protected $sslCiphers;
/** @var string|null */
private $serverVersion = null;
/**
* Additional $params include:
* - lagDetectionMethod : set to one of (Seconds_Behind_Master,pt-heartbeat).
- * pt-heartbeat assumes the table is at heartbeat.heartbeat
- * and uses UTC timestamps in the heartbeat.ts column.
- * (https://www.percona.com/doc/percona-toolkit/2.2/pt-heartbeat.html)
+ * pt-heartbeat assumes the table is at heartbeat.heartbeat
+ * and uses UTC timestamps in the heartbeat.ts column.
+ * (https://www.percona.com/doc/percona-toolkit/2.2/pt-heartbeat.html)
* - lagDetectionOptions : if using pt-heartbeat, this can be set to an array map to change
- * the default behavior. Normally, the heartbeat row with the server
- * ID of this server's master will be used. Set the "conds" field to
- * override the query conditions, e.g. ['shard' => 's1'].
+ * the default behavior. Normally, the heartbeat row with the server
+ * ID of this server's master will be used. Set the "conds" field to
+ * override the query conditions, e.g. ['shard' => 's1'].
+ * - useGTIDs : use GTID methods like MASTER_GTID_WAIT() when possible.
+ * - sslKeyPath : path to key file [default: null]
+ * - sslCertPath : path to certificate file [default: null]
+ * - sslCAPath : parth to certificate authority PEM files [default: null]
+ * - sslCiphers : array list of allowable ciphers [default: null]
* @param array $params
*/
function __construct( array $params ) {
$this->lagDetectionOptions = isset( $params['lagDetectionOptions'] )
? $params['lagDetectionOptions']
: [];
+ $this->useGTIDs = !empty( $params['useGTIDs' ] );
+ foreach ( [ 'KeyPath', 'CertPath', 'CAPath', 'Ciphers' ] as $name ) {
+ $var = "ssl{$name}";
+ if ( isset( $params[$var] ) ) {
+ $this->$var = $params[$var];
+ }
+ }
}
/**
* @return string
*/
function strencode( $s ) {
- $sQuoted = $this->mysqlRealEscapeString( $s );
-
- if ( $sQuoted === false ) {
- $this->ping();
- $sQuoted = $this->mysqlRealEscapeString( $s );
- }
-
- return $sQuoted;
+ return $this->mysqlRealEscapeString( $s );
}
/**
return strlen( $name ) && $name[0] == '`' && substr( $name, -1, 1 ) == '`';
}
- /**
- * @return bool
- */
- function ping() {
- $ping = $this->mysqlPing();
- if ( $ping ) {
- // Connection was good or lost but reconnected...
- // @note: mysqlnd (php 5.6+) does not support this (PHP bug 52561)
- return true;
- }
-
- // Try a full disconnect/reconnect cycle if ping() failed
- $this->closeConnection();
- $this->mOpened = false;
- $this->mConn = false;
- $this->open( $this->mServer, $this->mUser, $this->mPassword, $this->mDBname );
-
- return true;
- }
-
- /**
- * Ping a server connection or reconnect if there is no connection
- *
- * @return bool
- */
- abstract protected function mysqlPing();
-
function getLag() {
if ( $this->getLagDetectionMethod() === 'pt-heartbeat' ) {
return $this->getLagFromPtHeartbeat();
$key = $cache->makeGlobalKey(
'mysql',
'master-info',
- // Using one key for all cluster slaves is preferable
+ // Using one key for all cluster replica DBs is preferable
$this->getLBInfo( 'clusterMasterHost' ) ?: $this->getServer()
);
if ( $this->getLBInfo( 'is static' ) === true ) {
return 0; // this is a copy of a read-only dataset with no master DB
- } elseif ( $this->lastKnownSlavePos && $this->lastKnownSlavePos->hasReached( $pos ) ) {
+ } elseif ( $this->lastKnownReplicaPos && $this->lastKnownReplicaPos->hasReached( $pos ) ) {
return 0; // already reached this point for sure
}
- # Commit any open transactions
- $this->commit( __METHOD__, 'flush' );
-
- # Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
- $encFile = $this->addQuotes( $pos->file );
- $encPos = intval( $pos->pos );
- $res = $this->doQuery( "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)" );
+ // Call doQuery() directly, to avoid opening a transaction if DBO_TRX is set
+ if ( $this->useGTIDs && $pos->gtids ) {
+ // Wait on the GTID set (MariaDB only)
+ $gtidArg = $this->addQuotes( implode( ',', $pos->gtids ) );
+ $res = $this->doQuery( "SELECT MASTER_GTID_WAIT($gtidArg, $timeout)" );
+ } else {
+ // Wait on the binlog coordinates
+ $encFile = $this->addQuotes( $pos->file );
+ $encPos = intval( $pos->pos );
+ $res = $this->doQuery( "SELECT MASTER_POS_WAIT($encFile, $encPos, $timeout)" );
+ }
$row = $res ? $this->fetchRow( $res ) : false;
if ( !$row ) {
if ( $status === null ) {
// T126436: jobs programmed to wait on master positions might be referencing binlogs
// with an old master hostname. Such calls make MASTER_POS_WAIT() return null. Try
- // to detect this and treat the slave as having reached the position; a proper master
+ // to detect this and treat the replica DB as having reached the position; a proper master
// switchover already requires that the new master be caught up before the switch.
- $slavePos = $this->getSlavePos();
- if ( $slavePos && !$slavePos->channelsMatch( $pos ) ) {
- $this->lastKnownSlavePos = $slavePos;
+ $replicationPos = $this->getSlavePos();
+ if ( $replicationPos && !$replicationPos->channelsMatch( $pos ) ) {
+ $this->lastKnownReplicaPos = $replicationPos;
$status = 0;
}
} elseif ( $status >= 0 ) {
// Remember that this position was reached to save queries next time
- $this->lastKnownSlavePos = $pos;
+ $this->lastKnownReplicaPos = $pos;
}
return $status;
* @return MySQLMasterPos|bool
*/
function getSlavePos() {
- $res = $this->query( 'SHOW SLAVE STATUS', 'DatabaseBase::getSlavePos' );
+ $res = $this->query( 'SHOW SLAVE STATUS', __METHOD__ );
$row = $this->fetchObject( $res );
if ( $row ) {
$pos = isset( $row->Exec_master_log_pos )
? $row->Exec_master_log_pos
: $row->Exec_Master_Log_Pos;
+ // Also fetch the last-applied GTID set (MariaDB)
+ if ( $this->useGTIDs ) {
+ $res = $this->query( "SHOW GLOBAL VARIABLES LIKE 'gtid_slave_pos'", __METHOD__ );
+ $gtidRow = $this->fetchObject( $res );
+ $gtidSet = $gtidRow ? $gtidRow->Value : '';
+ } else {
+ $gtidSet = '';
+ }
- return new MySQLMasterPos( $row->Relay_Master_Log_File, $pos );
+ return new MySQLMasterPos( $row->Relay_Master_Log_File, $pos, $gtidSet );
} else {
return false;
}
* @return MySQLMasterPos|bool
*/
function getMasterPos() {
- $res = $this->query( 'SHOW MASTER STATUS', 'DatabaseBase::getMasterPos' );
+ $res = $this->query( 'SHOW MASTER STATUS', __METHOD__ );
$row = $this->fetchObject( $res );
if ( $row ) {
- return new MySQLMasterPos( $row->File, $row->Position );
+ // Also fetch the last-written GTID set (MariaDB)
+ if ( $this->useGTIDs ) {
+ $res = $this->query( "SHOW GLOBAL VARIABLES LIKE 'gtid_binlog_pos'", __METHOD__ );
+ $gtidRow = $this->fetchObject( $res );
+ $gtidSet = $gtidRow ? $gtidRow->Value : '';
+ } else {
+ $gtidSet = '';
+ }
+
+ return new MySQLMasterPos( $row->File, $row->Position, $gtidSet );
} else {
return false;
}
}
+ public function serverIsReadOnly() {
+ $res = $this->query( "SHOW GLOBAL VARIABLES LIKE 'read_only'", __METHOD__ );
+ $row = $this->fetchObject( $res );
+
+ return $row ? ( strtolower( $row->Value ) === 'on' ) : false;
+ }
+
/**
* @param string $index
* @return string
*/
function useIndexClause( $index ) {
return "FORCE INDEX (" . $this->indexName( $index ) . ")";
+ }
+
+ /**
+ * @param string $index
+ * @return string
+ */
+ function ignoreIndexClause( $index ) {
+ return "IGNORE INDEX (" . $this->indexName( $index ) . ")";
}
/**
return $this->lastErrno() == 1205;
}
- /**
- * Determines if the last query error was something that should be dealt
- * with by pinging the connection and reissuing the query
- *
- * @return bool
- */
function wasErrorReissuable() {
return $this->lastErrno() == 2013 || $this->lastErrno() == 2006;
}
}
}
+/**
+ * DBMasterPos class for MySQL/MariaDB
+ *
+ * Note that master positions and sync logic here make some assumptions:
+ * - Binlog-based usage assumes single-source replication and non-hierarchical replication.
+ * - GTID-based usage allows getting/syncing with multi-source replication. It is assumed
+ * that GTID sets are complete (e.g. include all domains on the server).
+ */
class MySQLMasterPos implements DBMasterPos {
- /** @var string */
+ /** @var string Binlog file */
public $file;
- /** @var int Position */
+ /** @var int Binglog file position */
public $pos;
+ /** @var string[] GTID list */
+ public $gtids = [];
/** @var float UNIX timestamp */
public $asOfTime = 0.0;
- function __construct( $file, $pos ) {
+ /**
+ * @param string $file Binlog file name
+ * @param integer $pos Binlog position
+ * @param string $gtid Comma separated GTID set [optional]
+ */
+ function __construct( $file, $pos, $gtid = '' ) {
$this->file = $file;
$this->pos = $pos;
+ $this->gtids = array_map( 'trim', explode( ',', $gtid ) );
$this->asOfTime = microtime( true );
}
+ /**
+ * @return string <binlog file>/<position>, e.g db1034-bin.000976/843431247
+ */
+ function __toString() {
+ return "{$this->file}/{$this->pos}";
+ }
+
function asOfTime() {
return $this->asOfTime;
}
throw new InvalidArgumentException( "Position not an instance of " . __CLASS__ );
}
- $thisPos = $this->getCoordinates();
- $thatPos = $pos->getCoordinates();
+ // Prefer GTID comparisons, which work with multi-tier replication
+ $thisPosByDomain = $this->getGtidCoordinates();
+ $thatPosByDomain = $pos->getGtidCoordinates();
+ if ( $thisPosByDomain && $thatPosByDomain ) {
+ $reached = true;
+ // Check that this has positions GTE all of those in $pos for all domains in $pos
+ foreach ( $thatPosByDomain as $domain => $thatPos ) {
+ $thisPos = isset( $thisPosByDomain[$domain] ) ? $thisPosByDomain[$domain] : -1;
+ $reached = $reached && ( $thatPos <= $thisPos );
+ }
+
+ return $reached;
+ }
+
+ // Fallback to the binlog file comparisons
+ $thisBinPos = $this->getBinlogCoordinates();
+ $thatBinPos = $pos->getBinlogCoordinates();
+ if ( $thisBinPos && $thatBinPos && $thisBinPos['binlog'] === $thatBinPos['binlog'] ) {
+ return ( $thisBinPos['pos'] >= $thatBinPos['pos'] );
+ }
- return ( $thisPos && $thatPos && $thisPos >= $thatPos );
+ // Comparing totally different binlogs does not make sense
+ return false;
}
function channelsMatch( DBMasterPos $pos ) {
throw new InvalidArgumentException( "Position not an instance of " . __CLASS__ );
}
- $thisBinlog = $this->getBinlogName();
- $thatBinlog = $pos->getBinlogName();
+ // Prefer GTID comparisons, which work with multi-tier replication
+ $thisPosDomains = array_keys( $this->getGtidCoordinates() );
+ $thatPosDomains = array_keys( $pos->getGtidCoordinates() );
+ if ( $thisPosDomains && $thatPosDomains ) {
+ // Check that this has GTIDs for all domains in $pos
+ return !array_diff( $thatPosDomains, $thisPosDomains );
+ }
- return ( $thisBinlog !== false && $thisBinlog === $thatBinlog );
- }
+ // Fallback to the binlog file comparisons
+ $thisBinPos = $this->getBinlogCoordinates();
+ $thatBinPos = $pos->getBinlogCoordinates();
- function __toString() {
- // e.g db1034-bin.000976/843431247
- return "{$this->file}/{$this->pos}";
+ return ( $thisBinPos && $thatBinPos && $thisBinPos['binlog'] === $thatBinPos['binlog'] );
}
/**
- * @return string|bool
+ * @note: this returns false for multi-source replication GTID sets
+ * @see https://mariadb.com/kb/en/mariadb/gtid
+ * @see https://dev.mysql.com/doc/refman/5.6/en/replication-gtids-concepts.html
+ * @return array Map of (domain => integer position) or false
*/
- protected function getBinlogName() {
- $m = [];
- if ( preg_match( '!^(.+)\.(\d+)/(\d+)$!', (string)$this, $m ) ) {
- return $m[1];
+ protected function getGtidCoordinates() {
+ $gtidInfos = [];
+ foreach ( $this->gtids as $gtid ) {
+ $m = [];
+ // MariaDB style: <domain>-<server id>-<sequence number>
+ if ( preg_match( '!^(\d+)-\d+-(\d+)$!', $gtid, $m ) ) {
+ $gtidInfos[(int)$m[1]] = (int)$m[2];
+ // MySQL style: <UUID domain>:<sequence number>
+ } elseif ( preg_match( '!^(\w{8}-\w{4}-\w{4}-\w{4}-\w{12}):(\d+)$!', $gtid, $m ) ) {
+ $gtidInfos[$m[1]] = (int)$m[2];
+ } else {
+ $gtidInfos = [];
+ break; // unrecognized GTID
+ }
+
}
- return false;
+ return $gtidInfos;
}
/**
- * @return array|bool (int, int)
+ * @see http://dev.mysql.com/doc/refman/5.7/en/show-master-status.html
+ * @see http://dev.mysql.com/doc/refman/5.7/en/show-slave-status.html
+ * @return array|bool (binlog, (integer file number, integer position)) or false
*/
- protected function getCoordinates() {
+ protected function getBinlogCoordinates() {
$m = [];
- if ( preg_match( '!\.(\d+)/(\d+)$!', (string)$this, $m ) ) {
- return [ (int)$m[1], (int)$m[2] ];
+ if ( preg_match( '!^(.+)\.(\d+)/(\d+)$!', (string)$this, $m ) ) {
+ return [ 'binlog' => $m[1], 'pos' => [ (int)$m[2], (int)$m[3] ] ];
}
return false;