X-Git-Url: https://git.heureux-cyclage.org/?a=blobdiff_plain;f=includes%2Fdb%2FDatabase.php;h=59c1207e49ae620fdb0374085c20e68c68088322;hb=4a975b8099ee11b15421d03be02206935a8422f1;hp=be0399d62f4141fa8fc2c8c41c2851cc92a0e472;hpb=99fefe9af6c35ded8e7c9da7bd099719f7133054;p=lhc%2Fweb%2Fwiklou.git diff --git a/includes/db/Database.php b/includes/db/Database.php index be0399d62f..0a1774dedb 100644 --- a/includes/db/Database.php +++ b/includes/db/Database.php @@ -32,24 +32,42 @@ abstract class DatabaseBase implements IDatabase { /** Number of times to re-try an operation in case of deadlock */ const DEADLOCK_TRIES = 4; - /** Minimum time to wait before retry, in microseconds */ const DEADLOCK_DELAY_MIN = 500000; - /** Maximum time to wait before retry */ const DEADLOCK_DELAY_MAX = 1500000; + /** How long before it is worth doing a dummy query to test the connection */ + const PING_TTL = 1.0; + const PING_QUERY = 'SELECT 1 AS ping'; + + const TINY_WRITE_SEC = .010; + const SLOW_WRITE_SEC = .500; + const SMALL_WRITE_ROWS = 100; + + /** @var string SQL query */ protected $mLastQuery = ''; + /** @var bool */ protected $mDoneWrites = false; + /** @var string|bool */ protected $mPHPError = false; - - protected $mServer, $mUser, $mPassword, $mDBname; + /** @var string */ + protected $mServer; + /** @var string */ + protected $mUser; + /** @var string */ + protected $mPassword; + /** @var string */ + protected $mDBname; + /** @var bool */ + protected $cliMode; /** @var BagOStuff APC cache */ protected $srvCache; /** @var resource Database connection */ protected $mConn = null; + /** @var bool */ protected $mOpened = false; /** @var array[] List of (callable, method name) */ @@ -58,23 +76,32 @@ abstract class DatabaseBase implements IDatabase { protected $mTrxPreCommitCallbacks = []; /** @var array[] List of (callable, method name) */ protected $mTrxEndCallbacks = []; - /** @var bool Whether to suppress triggering of post-commit callbacks */ - protected $suppressPostCommitCallbacks = false; + /** @var array[] Map of (name => (callable, method name)) */ + protected $mTrxRecurringCallbacks = []; + /** @var bool Whether to suppress triggering of transaction end callbacks */ + protected $mTrxEndCallbacksSuppressed = false; + /** @var string */ protected $mTablePrefix; + /** @var string */ protected $mSchema; + /** @var integer */ protected $mFlags; + /** @var bool */ protected $mForeign; + /** @var array */ protected $mLBInfo = []; + /** @var bool|null */ protected $mDefaultBigSelects = null; + /** @var array|bool */ protected $mSchemaVars = false; /** @var array */ protected $mSessionVars = []; - + /** @var array|null */ protected $preparedArgs; - + /** @var string|bool|null Stashed value of html_errors INI setting */ protected $htmlErrors; - + /** @var string */ protected $delimiter = ';'; /** @@ -84,7 +111,6 @@ abstract class DatabaseBase implements IDatabase { * @var int */ protected $mTrxLevel = 0; - /** * Either a short hexidecimal string if a transaction is active or "" * @@ -92,7 +118,6 @@ abstract class DatabaseBase implements IDatabase { * @see DatabaseBase::mTrxLevel */ protected $mTrxShortId = ''; - /** * The UNIX time that the transaction started. Callers can assume that if * snapshot isolation is used, then the data is *at least* up to date to that @@ -102,10 +127,8 @@ abstract class DatabaseBase implements IDatabase { * @see DatabaseBase::mTrxLevel */ private $mTrxTimestamp = null; - /** @var float Lag estimate at the time of BEGIN */ - private $mTrxSlaveLag = null; - + private $mTrxReplicaLag = null; /** * Remembers the function name given for starting the most recent transaction via begin(). * Used to provide additional context for error reporting. @@ -114,7 +137,6 @@ abstract class DatabaseBase implements IDatabase { * @see DatabaseBase::mTrxLevel */ private $mTrxFname = null; - /** * Record if possible write queries were done in the last transaction started * @@ -122,7 +144,6 @@ abstract class DatabaseBase implements IDatabase { * @see DatabaseBase::mTrxLevel */ private $mTrxDoneWrites = false; - /** * Record if the current transaction was started implicitly due to DBO_TRX being set. * @@ -130,34 +151,44 @@ abstract class DatabaseBase implements IDatabase { * @see DatabaseBase::mTrxLevel */ private $mTrxAutomatic = false; - /** * Array of levels of atomicity within transactions * * @var array */ private $mTrxAtomicLevels = []; - /** * Record if the current transaction was started implicitly by DatabaseBase::startAtomic * * @var bool */ private $mTrxAutomaticAtomic = false; - /** * Track the write query callers of the current transaction * * @var string[] */ private $mTrxWriteCallers = []; - /** - * Track the seconds spent in write queries for the current transaction - * - * @var float + * @var float Seconds spent in write queries for the current transaction */ private $mTrxWriteDuration = 0.0; + /** + * @var integer Number of write queries for the current transaction + */ + private $mTrxWriteQueryCount = 0; + /** + * @var float Like mTrxWriteQueryCount but excludes lock-bound, easy to replicate, queries + */ + private $mTrxWriteAdjDuration = 0.0; + /** + * @var integer Number of write queries counted in mTrxWriteAdjDuration + */ + private $mTrxWriteAdjQueryCount = 0; + /** + * @var float RTT time estimate + */ + private $mRTTEstimate = 0.0; /** @var array Map of (name => 1) for locks obtained via lock() */ private $mNamedLocksHeld = []; @@ -177,6 +208,14 @@ abstract class DatabaseBase implements IDatabase { */ protected $allViews = null; + /** @var float UNIX timestamp */ + protected $lastPing = 0.0; + + /** @var int[] Prior mFlags values */ + private $priorFlags = []; + + /** @var Profiler */ + protected $profiler; /** @var TransactionProfiler */ protected $trxProfiler; @@ -296,10 +335,6 @@ abstract class DatabaseBase implements IDatabase { * @return TransactionProfiler */ protected function getTransactionProfiler() { - if ( !$this->trxProfiler ) { - $this->trxProfiler = new TransactionProfiler(); - } - return $this->trxProfiler; } @@ -397,8 +432,26 @@ abstract class DatabaseBase implements IDatabase { ); } - public function pendingWriteQueryDuration() { - return $this->mTrxLevel ? $this->mTrxWriteDuration : false; + public function pendingWriteQueryDuration( $type = self::ESTIMATE_TOTAL ) { + if ( !$this->mTrxLevel ) { + return false; + } elseif ( !$this->mTrxDoneWrites ) { + return 0.0; + } + + switch ( $type ) { + case self::ESTIMATE_DB_APPLY: + $this->ping( $rtt ); + $rttAdjTotal = $this->mTrxWriteAdjQueryCount * $rtt; + $applyTime = max( $this->mTrxWriteAdjDuration - $rttAdjTotal, 0 ); + // For omitted queries, make them count as something at least + $omitted = $this->mTrxWriteQueryCount - $this->mTrxWriteAdjQueryCount; + $applyTime += self::TINY_WRITE_SEC * $omitted; + + return $applyTime; + default: // everything + return $this->mTrxWriteDuration; + } } public function pendingWriteCallers() { @@ -409,14 +462,33 @@ abstract class DatabaseBase implements IDatabase { return $this->mOpened; } - public function setFlag( $flag ) { + public function setFlag( $flag, $remember = self::REMEMBER_NOTHING ) { + if ( $remember === self::REMEMBER_PRIOR ) { + array_push( $this->priorFlags, $this->mFlags ); + } $this->mFlags |= $flag; } - public function clearFlag( $flag ) { + public function clearFlag( $flag, $remember = self::REMEMBER_NOTHING ) { + if ( $remember === self::REMEMBER_PRIOR ) { + array_push( $this->priorFlags, $this->mFlags ); + } $this->mFlags &= ~$flag; } + public function restoreFlags( $state = self::RESTORE_PRIOR ) { + if ( !$this->priorFlags ) { + return; + } + + if ( $state === self::RESTORE_INITIAL ) { + $this->mFlags = reset( $this->priorFlags ); + $this->priorFlags = []; + } else { + $this->mFlags = array_pop( $this->priorFlags ); + } + } + public function getFlag( $flag ) { return !!( $this->mFlags & $flag ); } @@ -500,7 +572,7 @@ abstract class DatabaseBase implements IDatabase { * @param array $params Parameters passed from DatabaseBase::factory() */ function __construct( array $params ) { - global $wgDBprefix, $wgDBmwschema, $wgCommandLineMode; + global $wgDBprefix, $wgDBmwschema; $this->srvCache = ObjectCache::getLocalServerInstance( 'hash' ); @@ -513,9 +585,13 @@ abstract class DatabaseBase implements IDatabase { $schema = $params['schema']; $foreign = $params['foreign']; + $this->cliMode = isset( $params['cliMode'] ) + ? $params['cliMode'] + : ( PHP_SAPI === 'cli' ); + $this->mFlags = $flags; if ( $this->mFlags & DBO_DEFAULT ) { - if ( $wgCommandLineMode ) { + if ( $this->cliMode ) { $this->mFlags &= ~DBO_TRX; } else { $this->mFlags |= DBO_TRX; @@ -540,13 +616,17 @@ abstract class DatabaseBase implements IDatabase { $this->mForeign = $foreign; - if ( isset( $params['trxProfiler'] ) ) { - $this->trxProfiler = $params['trxProfiler']; // override - } + $this->profiler = isset( $params['profiler'] ) + ? $params['profiler'] + : Profiler::instance(); // @TODO: remove global state + $this->trxProfiler = isset( $params['trxProfiler'] ) + ? $params['trxProfiler'] + : new TransactionProfiler(); if ( $user ) { $this->open( $server, $user, $password, $dbName ); } + } /** @@ -582,6 +662,8 @@ abstract class DatabaseBase implements IDatabase { * @return DatabaseBase|null DatabaseBase subclass or null */ final public static function factory( $dbType, $p = [] ) { + global $wgCommandLineMode; + $canonicalDBTypes = [ 'mysql' => [ 'mysqli', 'mysql' ], 'postgres' => [], @@ -639,6 +721,7 @@ abstract class DatabaseBase implements IDatabase { $p['schema'] = isset( $defaultSchemas[$dbType] ) ? $defaultSchemas[$dbType] : null; } $p['foreign'] = isset( $p['foreign'] ) ? $p['foreign'] : false; + $p['cliMode'] = $wgCommandLineMode; return new $class( $p ); } else { @@ -763,7 +846,16 @@ abstract class DatabaseBase implements IDatabase { * @return bool */ protected function isWriteQuery( $sql ) { - return !preg_match( '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SET|SHOW|EXPLAIN|\(SELECT)\b/i', $sql ); + return !preg_match( + '/^(?:SELECT|BEGIN|ROLLBACK|COMMIT|SET|SHOW|EXPLAIN|\(SELECT)\b/i', $sql ); + } + + /** + * @param $sql + * @return string|null + */ + protected function getQueryVerb( $sql ) { + return preg_match( '/^\s*([a-z]+)/i', $sql, $m ) ? strtoupper( $m[1] ) : null; } /** @@ -776,8 +868,8 @@ abstract class DatabaseBase implements IDatabase { * @return bool */ protected function isTransactableQuery( $sql ) { - $verb = substr( $sql, 0, strcspn( $sql, " \t\r\n" ) ); - return !in_array( $verb, [ 'BEGIN', 'COMMIT', 'ROLLBACK', 'SHOW', 'SET' ] ); + $verb = $this->getQueryVerb( $sql ); + return !in_array( $verb, [ 'BEGIN', 'COMMIT', 'ROLLBACK', 'SHOW', 'SET' ], true ); } public function query( $sql, $fname = __METHOD__, $tempIgnore = false ) { @@ -786,8 +878,8 @@ abstract class DatabaseBase implements IDatabase { $priorWritesPending = $this->writesOrCallbacksPending(); $this->mLastQuery = $sql; - $isWriteQuery = $this->isWriteQuery( $sql ); - if ( $isWriteQuery ) { + $isWrite = $this->isWriteQuery( $sql ); + if ( $isWrite ) { $reason = $this->getReadOnlyReason(); if ( $reason !== false ) { throw new DBReadOnlyError( $this, "Database is read-only: $reason" ); @@ -820,49 +912,21 @@ abstract class DatabaseBase implements IDatabase { } # Keep track of whether the transaction has write queries pending - if ( $this->mTrxLevel && !$this->mTrxDoneWrites && $isWriteQuery ) { + if ( $this->mTrxLevel && !$this->mTrxDoneWrites && $isWrite ) { $this->mTrxDoneWrites = true; $this->getTransactionProfiler()->transactionWritingIn( $this->mServer, $this->mDBname, $this->mTrxShortId ); } - $isMaster = !is_null( $this->getLBInfo( 'master' ) ); - # generalizeSQL will probably cut down the query to reasonable - # logging size most of the time. The substr is really just a sanity check. - if ( $isMaster ) { - $queryProf = 'query-m: ' . substr( DatabaseBase::generalizeSQL( $sql ), 0, 255 ); - $totalProf = 'DatabaseBase::query-master'; - } else { - $queryProf = 'query: ' . substr( DatabaseBase::generalizeSQL( $sql ), 0, 255 ); - $totalProf = 'DatabaseBase::query'; - } - # Include query transaction state - $queryProf .= $this->mTrxShortId ? " [TRX#{$this->mTrxShortId}]" : ""; - - $profiler = Profiler::instance(); - if ( !$profiler instanceof ProfilerStub ) { - $totalProfSection = $profiler->scopedProfileIn( $totalProf ); - $queryProfSection = $profiler->scopedProfileIn( $queryProf ); - } - if ( $this->debug() ) { wfDebugLog( 'queries', sprintf( "%s: %s", $this->mDBname, $commentedSql ) ); } - $queryId = MWDebug::query( $sql, $fname, $isMaster ); - # Avoid fatals if close() was called $this->assertOpen(); - # Do the query and handle errors - $startTime = microtime( true ); - $ret = $this->doQuery( $commentedSql ); - $queryRuntime = microtime( true ) - $startTime; - # Log the query time and feed it into the DB trx profiler - $this->getTransactionProfiler()->recordQueryCompletion( - $queryProf, $startTime, $isWriteQuery, $this->affectedRows() ); - - MWDebug::queryTime( $queryId ); + # Send the query to the server + $ret = $this->doProfiledQuery( $sql, $commentedSql, $isWrite, $fname ); # Try reconnecting if the connection was lost if ( false === $ret && $this->wasErrorReissuable() ) { @@ -883,12 +947,7 @@ abstract class DatabaseBase implements IDatabase { $this->reportQueryError( $lastError, $lastErrno, $sql, $fname ); } else { # Should be safe to silently retry the query - $startTime = microtime( true ); - $ret = $this->doQuery( $commentedSql ); - $queryRuntime = microtime( true ) - $startTime; - # Log the query time and feed it into the DB trx profiler - $this->getTransactionProfiler()->recordQueryCompletion( - $queryProf, $startTime, $isWriteQuery, $this->affectedRows() ); + $ret = $this->doProfiledQuery( $sql, $commentedSql, $isWrite, $fname ); } } else { wfDebug( "Failed\n" ); @@ -913,16 +972,80 @@ abstract class DatabaseBase implements IDatabase { $res = $this->resultObject( $ret ); - // Destroy profile sections in the opposite order to their creation - ScopedCallback::consume( $queryProfSection ); - ScopedCallback::consume( $totalProfSection ); + return $res; + } + + private function doProfiledQuery( $sql, $commentedSql, $isWrite, $fname ) { + $isMaster = !is_null( $this->getLBInfo( 'master' ) ); + # generalizeSQL() will probably cut down the query to reasonable + # logging size most of the time. The substr is really just a sanity check. + if ( $isMaster ) { + $queryProf = 'query-m: ' . substr( DatabaseBase::generalizeSQL( $sql ), 0, 255 ); + } else { + $queryProf = 'query: ' . substr( DatabaseBase::generalizeSQL( $sql ), 0, 255 ); + } + + # Include query transaction state + $queryProf .= $this->mTrxShortId ? " [TRX#{$this->mTrxShortId}]" : ""; - if ( $isWriteQuery && $this->mTrxLevel ) { - $this->mTrxWriteDuration += $queryRuntime; - $this->mTrxWriteCallers[] = $fname; + $startTime = microtime( true ); + $this->profiler->profileIn( $queryProf ); + $ret = $this->doQuery( $commentedSql ); + $this->profiler->profileOut( $queryProf ); + $queryRuntime = max( microtime( true ) - $startTime, 0.0 ); + + unset( $queryProfSection ); // profile out (if set) + + if ( $ret !== false ) { + $this->lastPing = $startTime; + if ( $isWrite && $this->mTrxLevel ) { + $this->updateTrxWriteQueryTime( $sql, $queryRuntime ); + $this->mTrxWriteCallers[] = $fname; + } } - return $res; + if ( $sql === self::PING_QUERY ) { + $this->mRTTEstimate = $queryRuntime; + } + + $this->getTransactionProfiler()->recordQueryCompletion( + $queryProf, $startTime, $isWrite, $this->affectedRows() + ); + MWDebug::query( $sql, $fname, $isMaster, $queryRuntime ); + + return $ret; + } + + /** + * Update the estimated run-time of a query, not counting large row lock times + * + * LoadBalancer can be set to rollback transactions that will create huge replication + * lag. It bases this estimate off of pendingWriteQueryDuration(). Certain simple + * queries, like inserting a row can take a long time due to row locking. This method + * uses some simple heuristics to discount those cases. + * + * @param string $sql A SQL write query + * @param float $runtime Total runtime, including RTT + */ + private function updateTrxWriteQueryTime( $sql, $runtime ) { + // Whether this is indicative of replica DB runtime (except for RBR or ws_repl) + $indicativeOfReplicaRuntime = true; + if ( $runtime > self::SLOW_WRITE_SEC ) { + $verb = $this->getQueryVerb( $sql ); + // insert(), upsert(), replace() are fast unless bulky in size or blocked on locks + if ( $verb === 'INSERT' ) { + $indicativeOfReplicaRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS; + } elseif ( $verb === 'REPLACE' ) { + $indicativeOfReplicaRuntime = $this->affectedRows() > self::SMALL_WRITE_ROWS / 2; + } + } + + $this->mTrxWriteDuration += $runtime; + $this->mTrxWriteQueryCount += 1; + if ( $indicativeOfReplicaRuntime ) { + $this->mTrxWriteAdjDuration += $runtime; + $this->mTrxWriteAdjQueryCount += 1; + } } private function canRecoverFromDisconnect( $sql, $priorWritesPending ) { @@ -952,6 +1075,7 @@ abstract class DatabaseBase implements IDatabase { try { // Handle callbacks in mTrxEndCallbacks $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK ); + $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK ); return null; } catch ( Exception $e ) { // Already logged; move on... @@ -1220,8 +1344,13 @@ abstract class DatabaseBase implements IDatabase { } else { $useIndex = ''; } + if ( isset( $options['IGNORE INDEX'] ) && is_string( $options['IGNORE INDEX'] ) ) { + $ignoreIndex = $this->ignoreIndexClause( $options['IGNORE INDEX'] ); + } else { + $ignoreIndex = ''; + } - return [ $startOpts, $useIndex, $preLimitTail, $postLimitTail ]; + return [ $startOpts, $useIndex, $preLimitTail, $postLimitTail, $ignoreIndex ]; } /** @@ -1289,31 +1418,34 @@ abstract class DatabaseBase implements IDatabase { $useIndexes = ( isset( $options['USE INDEX'] ) && is_array( $options['USE INDEX'] ) ) ? $options['USE INDEX'] : []; + $ignoreIndexes = ( isset( $options['IGNORE INDEX'] ) && is_array( $options['IGNORE INDEX'] ) ) + ? $options['IGNORE INDEX'] + : []; if ( is_array( $table ) ) { $from = ' FROM ' . - $this->tableNamesWithUseIndexOrJOIN( $table, $useIndexes, $join_conds ); + $this->tableNamesWithIndexClauseOrJOIN( $table, $useIndexes, $ignoreIndexes, $join_conds ); } elseif ( $table != '' ) { if ( $table[0] == ' ' ) { $from = ' FROM ' . $table; } else { $from = ' FROM ' . - $this->tableNamesWithUseIndexOrJOIN( [ $table ], $useIndexes, [] ); + $this->tableNamesWithIndexClauseOrJOIN( [ $table ], $useIndexes, $ignoreIndexes, [] ); } } else { $from = ''; } - list( $startOpts, $useIndex, $preLimitTail, $postLimitTail ) = + list( $startOpts, $useIndex, $preLimitTail, $postLimitTail, $ignoreIndex ) = $this->makeSelectOptions( $options ); if ( !empty( $conds ) ) { if ( is_array( $conds ) ) { $conds = $this->makeList( $conds, LIST_AND ); } - $sql = "SELECT $startOpts $vars $from $useIndex WHERE $conds $preLimitTail"; + $sql = "SELECT $startOpts $vars $from $useIndex $ignoreIndex WHERE $conds $preLimitTail"; } else { - $sql = "SELECT $startOpts $vars $from $useIndex $preLimitTail"; + $sql = "SELECT $startOpts $vars $from $useIndex $ignoreIndex $preLimitTail"; } if ( isset( $options['LIMIT'] ) ) { @@ -1686,6 +1818,15 @@ abstract class DatabaseBase implements IDatabase { return '(' . $this->selectSQLText( $table, $fld, $conds, null, [], $join_conds ) . ')'; } + /** + * @param string $field Field or column to cast + * @return string + * @since 1.28 + */ + public function buildStringCast( $field ) { + return $field; + } + public function selectDB( $db ) { # Stub. Shouldn't cause serious problems if it's not overridden, but # if your database engine supports a concept similar to MySQL's @@ -1915,19 +2056,21 @@ abstract class DatabaseBase implements IDatabase { /** * Get the aliased table name clause for a FROM clause - * which might have a JOIN and/or USE INDEX clause + * which might have a JOIN and/or USE INDEX or IGNORE INDEX clause * * @param array $tables ( [alias] => table ) * @param array $use_index Same as for select() + * @param array $ignore_index Same as for select() * @param array $join_conds Same as for select() * @return string */ - protected function tableNamesWithUseIndexOrJOIN( - $tables, $use_index = [], $join_conds = [] + protected function tableNamesWithIndexClauseOrJOIN( + $tables, $use_index = [], $ignore_index = [], $join_conds = [] ) { $ret = []; $retJOIN = []; $use_index = (array)$use_index; + $ignore_index = (array)$ignore_index; $join_conds = (array)$join_conds; foreach ( $tables as $alias => $table ) { @@ -1946,6 +2089,12 @@ abstract class DatabaseBase implements IDatabase { $tableClause .= ' ' . $use; } } + if ( isset( $ignore_index[$alias] ) ) { // has IGNORE INDEX? + $ignore = $this->ignoreIndexClause( implode( ',', (array)$ignore_index[$alias] ) ); + if ( $ignore != '' ) { + $tableClause .= ' ' . $ignore; + } + } $on = $this->makeList( (array)$conds, LIST_AND ); if ( $on != '' ) { $tableClause .= ' ON (' . $on . ')'; @@ -1959,6 +2108,14 @@ abstract class DatabaseBase implements IDatabase { implode( ',', (array)$use_index[$alias] ) ); + $ret[] = $tableClause; + } elseif ( isset( $ignore_index[$alias] ) ) { + // Is there an INDEX clause for this table? + $tableClause = $this->tableNameWithAlias( $table, $alias ); + $tableClause .= ' ' . $this->ignoreIndexClause( + implode( ',', (array)$ignore_index[$alias] ) + ); + $ret[] = $tableClause; } else { $tableClause = $this->tableNameWithAlias( $table, $alias ); @@ -2091,6 +2248,20 @@ abstract class DatabaseBase implements IDatabase { return ''; } + /** + * IGNORE INDEX clause. Unlikely to be useful for anything but MySQL. This + * is only needed because a) MySQL must be as efficient as possible due to + * its use on Wikipedia, and b) MySQL 4.0 is kind of dumb sometimes about + * which index to pick. Anyway, other databases might have different + * indexes on a given table. So don't bother overriding this unless you're + * MySQL. + * @param string $index + * @return string + */ + public function ignoreIndexClause( $index ) { + return ''; + } + public function replace( $table, $uniqueIndexes, $rows, $fname = __METHOD__ ) { $quotedTable = $this->tableName( $table ); @@ -2216,12 +2387,12 @@ abstract class DatabaseBase implements IDatabase { $ok = $this->insert( $table, $rows, $fname, [ 'IGNORE' ] ) && $ok; } catch ( Exception $e ) { if ( $useTrx ) { - $this->rollback( $fname ); + $this->rollback( $fname, self::FLUSHING_INTERNAL ); } throw $e; } if ( $useTrx ) { - $this->commit( $fname, self::TRANSACTION_INTERNAL ); + $this->commit( $fname, self::FLUSHING_INTERNAL ); } return $ok; @@ -2300,7 +2471,46 @@ abstract class DatabaseBase implements IDatabase { return $this->query( $sql, $fname ); } - public function insertSelect( $destTable, $srcTable, $varMap, $conds, + public function insertSelect( + $destTable, $srcTable, $varMap, $conds, + $fname = __METHOD__, $insertOptions = [], $selectOptions = [] + ) { + if ( $this->cliMode ) { + // For massive migrations with downtime, we don't want to select everything + // into memory and OOM, so do all this native on the server side if possible. + return $this->nativeInsertSelect( + $destTable, + $srcTable, + $varMap, + $conds, + $fname, + $insertOptions, + $selectOptions + ); + } + + // For web requests, do a locking SELECT and then INSERT. This puts the SELECT burden + // on only the master (without needing row-based-replication). It also makes it easy to + // know how big the INSERT is going to be. + $fields = []; + foreach ( $varMap as $dstColumn => $sourceColumnOrSql ) { + $fields[] = $this->fieldNameWithAlias( $sourceColumnOrSql, $dstColumn ); + } + $selectOptions[] = 'FOR UPDATE'; + $res = $this->select( $srcTable, implode( ',', $fields ), $conds, $fname, $selectOptions ); + if ( !$res ) { + return false; + } + + $rows = []; + foreach ( $res as $row ) { + $rows[] = (array)$row; + } + + return $this->insert( $destTable, $rows, $fname, $insertOptions ); + } + + public function nativeInsertSelect( $destTable, $srcTable, $varMap, $conds, $fname = __METHOD__, $insertOptions = [], $selectOptions = [] ) { @@ -2316,7 +2526,8 @@ abstract class DatabaseBase implements IDatabase { $selectOptions = [ $selectOptions ]; } - list( $startOpts, $useIndex, $tailOpts ) = $this->makeSelectOptions( $selectOptions ); + list( $startOpts, $useIndex, $tailOpts, $ignoreIndex ) = $this->makeSelectOptions( + $selectOptions ); if ( is_array( $srcTable ) ) { $srcTable = implode( ',', array_map( [ &$this, 'tableName' ], $srcTable ) ); @@ -2326,7 +2537,7 @@ abstract class DatabaseBase implements IDatabase { $sql = "INSERT $insertOptions INTO $destTable (" . implode( ',', array_keys( $varMap ) ) . ')' . " SELECT $startOpts " . implode( ',', $varMap ) . - " FROM $srcTable $useIndex "; + " FROM $srcTable $useIndex $ignoreIndex "; if ( $conds != '*' ) { if ( is_array( $conds ) ) { @@ -2520,27 +2731,35 @@ abstract class DatabaseBase implements IDatabase { $this->mTrxPreCommitCallbacks[] = [ $callback, wfGetCaller() ]; } else { // If no transaction is active, then make one for this callback - $this->begin( __METHOD__, self::TRANSACTION_INTERNAL ); + $this->startAtomic( __METHOD__ ); try { call_user_func( $callback ); - $this->commit( __METHOD__ ); + $this->endAtomic( __METHOD__ ); } catch ( Exception $e ) { - $this->rollback( __METHOD__ ); + $this->rollback( __METHOD__, self::FLUSHING_INTERNAL ); throw $e; } } } + final public function setTransactionListener( $name, callable $callback = null ) { + if ( $callback ) { + $this->mTrxRecurringCallbacks[$name] = [ $callback, wfGetCaller() ]; + } else { + unset( $this->mTrxRecurringCallbacks[$name] ); + } + } + /** - * Whether to disable running of post-commit callbacks + * Whether to disable running of post-COMMIT/ROLLBACK callbacks * * This method should not be used outside of Database/LoadBalancer * * @param bool $suppress * @since 1.28 */ - final public function setPostCommitCallbackSupression( $suppress ) { - $this->suppressPostCommitCallbacks = $suppress; + final public function setTrxEndCallbackSuppression( $suppress ) { + $this->mTrxEndCallbacksSuppressed = $suppress; } /** @@ -2553,13 +2772,13 @@ abstract class DatabaseBase implements IDatabase { * @throws Exception */ public function runOnTransactionIdleCallbacks( $trigger ) { - if ( $this->suppressPostCommitCallbacks ) { + if ( $this->mTrxEndCallbacksSuppressed ) { return; } $autoTrx = $this->getFlag( DBO_TRX ); // automatic begin() enabled? /** @var Exception $e */ - $e = $ePrior = null; // last exception + $e = null; // first exception do { // callbacks may add callbacks :) $callbacks = array_merge( $this->mTrxIdleCallbacks, @@ -2577,22 +2796,20 @@ abstract class DatabaseBase implements IDatabase { } else { $this->clearFlag( DBO_TRX ); // restore auto-commit } - } catch ( Exception $e ) { - if ( $ePrior ) { - MWExceptionHandler::logException( $ePrior ); - } - $ePrior = $e; + } catch ( Exception $ex ) { + MWExceptionHandler::logException( $ex ); + $e = $e ?: $ex; // Some callbacks may use startAtomic/endAtomic, so make sure // their transactions are ended so other callbacks don't fail if ( $this->trxLevel() ) { - $this->rollback( __METHOD__ ); + $this->rollback( __METHOD__, self::FLUSHING_INTERNAL ); } } } } while ( count( $this->mTrxIdleCallbacks ) ); if ( $e instanceof Exception ) { - throw $e; // re-throw any last exception + throw $e; // re-throw any first exception } } @@ -2602,9 +2819,10 @@ abstract class DatabaseBase implements IDatabase { * This method should not be used outside of Database/LoadBalancer * * @since 1.22 + * @throws Exception */ public function runOnTransactionPreCommitCallbacks() { - $e = $ePrior = null; // last exception + $e = null; // first exception do { // callbacks may add callbacks :) $callbacks = $this->mTrxPreCommitCallbacks; $this->mTrxPreCommitCallbacks = []; // consumed (and recursion guard) @@ -2612,24 +2830,53 @@ abstract class DatabaseBase implements IDatabase { try { list( $phpCallback ) = $callback; call_user_func( $phpCallback ); - } catch ( Exception $e ) { - if ( $ePrior ) { - MWExceptionHandler::logException( $ePrior ); - } - $ePrior = $e; + } catch ( Exception $ex ) { + MWExceptionHandler::logException( $ex ); + $e = $e ?: $ex; } } } while ( count( $this->mTrxPreCommitCallbacks ) ); if ( $e instanceof Exception ) { - throw $e; // re-throw any last exception + throw $e; // re-throw any first exception + } + } + + /** + * Actually run any "transaction listener" callbacks. + * + * This method should not be used outside of Database/LoadBalancer + * + * @param integer $trigger IDatabase::TRIGGER_* constant + * @throws Exception + * @since 1.20 + */ + public function runTransactionListenerCallbacks( $trigger ) { + if ( $this->mTrxEndCallbacksSuppressed ) { + return; + } + + /** @var Exception $e */ + $e = null; // first exception + + foreach ( $this->mTrxRecurringCallbacks as $callback ) { + try { + list( $phpCallback ) = $callback; + $phpCallback( $trigger, $this ); + } catch ( Exception $ex ) { + MWExceptionHandler::logException( $ex ); + $e = $e ?: $ex; + } + } + + if ( $e instanceof Exception ) { + throw $e; // re-throw any first exception } } final public function startAtomic( $fname = __METHOD__ ) { if ( !$this->mTrxLevel ) { $this->begin( $fname, self::TRANSACTION_INTERNAL ); - $this->mTrxAutomatic = true; // If DBO_TRX is set, a series of startAtomic/endAtomic pairs will result // in all changes being in one transaction to keep requests transactional. if ( !$this->getFlag( DBO_TRX ) ) { @@ -2660,7 +2907,7 @@ abstract class DatabaseBase implements IDatabase { try { $res = call_user_func_array( $callback, [ $this, $fname ] ); } catch ( Exception $e ) { - $this->rollback( $fname ); + $this->rollback( $fname, self::FLUSHING_INTERNAL ); throw $e; } $this->endAtomic( $fname ); @@ -2673,7 +2920,7 @@ abstract class DatabaseBase implements IDatabase { if ( $this->mTrxLevel ) { if ( $this->mTrxAtomicLevels ) { $levels = implode( ', ', $this->mTrxAtomicLevels ); - $msg = "Got explicit BEGIN from $fname while atomic section(s) $levels are open."; + $msg = "$fname: Got explicit BEGIN while atomic section(s) $levels are open."; throw new DBUnexpectedError( $this, $msg ); } elseif ( !$this->mTrxAutomatic ) { $msg = "$fname: Explicit transaction already active (from {$this->mTrxFname})."; @@ -2682,11 +2929,14 @@ abstract class DatabaseBase implements IDatabase { // @TODO: make this an exception at some point $msg = "$fname: Implicit transaction already active (from {$this->mTrxFname})."; wfLogDBError( $msg ); + wfWarn( $msg ); return; // join the main transaction set } } elseif ( $this->getFlag( DBO_TRX ) && $mode !== self::TRANSACTION_INTERNAL ) { // @TODO: make this an exception at some point - wfLogDBError( "$fname: Implicit transaction expected (DBO_TRX set)." ); + $msg = "$fname: Implicit transaction expected (DBO_TRX set)."; + wfLogDBError( $msg ); + wfWarn( $msg ); return; // let any writes be in the main transaction } @@ -2697,17 +2947,20 @@ abstract class DatabaseBase implements IDatabase { $this->mTrxTimestamp = microtime( true ); $this->mTrxFname = $fname; $this->mTrxDoneWrites = false; - $this->mTrxAutomatic = false; + $this->mTrxAutomatic = ( $mode === self::TRANSACTION_INTERNAL ); $this->mTrxAutomaticAtomic = false; $this->mTrxAtomicLevels = []; $this->mTrxShortId = wfRandomString( 12 ); $this->mTrxWriteDuration = 0.0; + $this->mTrxWriteQueryCount = 0; + $this->mTrxWriteAdjDuration = 0.0; + $this->mTrxWriteAdjQueryCount = 0; $this->mTrxWriteCallers = []; // First SELECT after BEGIN will establish the snapshot in REPEATABLE-READ. - // Get an estimate of the slave lag before then, treating estimate staleness + // Get an estimate of the replica DB lag before then, treating estimate staleness // as lag itself just to be safe $status = $this->getApproximateLagStatus(); - $this->mTrxSlaveLag = $status['lag'] + ( microtime( true ) - $status['since'] ); + $this->mTrxReplicaLag = $status['lag'] + ( microtime( true ) - $status['since'] ); } /** @@ -2727,7 +2980,7 @@ abstract class DatabaseBase implements IDatabase { $levels = implode( ', ', $this->mTrxAtomicLevels ); throw new DBUnexpectedError( $this, - "Got COMMIT while atomic sections $levels are still open." + "$fname: Got COMMIT while atomic sections $levels are still open." ); } @@ -2746,7 +2999,9 @@ abstract class DatabaseBase implements IDatabase { return; // nothing to do } elseif ( $this->mTrxAutomatic ) { // @TODO: make this an exception at some point - wfLogDBError( "$fname: Explicit commit of implicit transaction." ); + $msg = "$fname: Explicit commit of implicit transaction."; + wfLogDBError( $msg ); + wfWarn( $msg ); return; // wait for the main transaction set commit round } } @@ -2755,7 +3010,7 @@ abstract class DatabaseBase implements IDatabase { $this->assertOpen(); $this->runOnTransactionPreCommitCallbacks(); - $writeTime = $this->pendingWriteQueryDuration(); + $writeTime = $this->pendingWriteQueryDuration( self::ESTIMATE_DB_APPLY ); $this->doCommit( $fname ); if ( $this->mTrxDoneWrites ) { $this->mDoneWrites = microtime( true ); @@ -2764,6 +3019,7 @@ abstract class DatabaseBase implements IDatabase { } $this->runOnTransactionIdleCallbacks( self::TRIGGER_COMMIT ); + $this->runTransactionListenerCallbacks( self::TRIGGER_COMMIT ); } /** @@ -2809,6 +3065,7 @@ abstract class DatabaseBase implements IDatabase { $this->mTrxIdleCallbacks = []; // clear $this->mTrxPreCommitCallbacks = []; // clear $this->runOnTransactionIdleCallbacks( self::TRIGGER_ROLLBACK ); + $this->runTransactionListenerCallbacks( self::TRIGGER_ROLLBACK ); } /** @@ -2826,10 +3083,19 @@ abstract class DatabaseBase implements IDatabase { } } - /** - * @return bool - */ - protected function explicitTrxActive() { + public function flushSnapshot( $fname = __METHOD__ ) { + if ( $this->writesOrCallbacksPending() || $this->explicitTrxActive() ) { + // This only flushes transactions to clear snapshots, not to write data + throw new DBUnexpectedError( + $this, + "$fname: Cannot COMMIT to clear snapshot because writes are pending." + ); + } + + $this->commit( $fname, self::FLUSHING_INTERNAL ); + } + + public function explicitTrxActive() { return $this->mTrxLevel && ( $this->mTrxAtomicLevels || !$this->mTrxAutomatic ); } @@ -2933,22 +3199,43 @@ abstract class DatabaseBase implements IDatabase { } } - public function ping() { - try { - // This will reconnect if possible, or error out if not - $this->query( "SELECT 1 AS ping", __METHOD__ ); - return true; - } catch ( DBError $e ) { - return false; + public function ping( &$rtt = null ) { + // Avoid hitting the server if it was hit recently + if ( $this->isOpen() && ( microtime( true ) - $this->lastPing ) < self::PING_TTL ) { + if ( !func_num_args() || $this->mRTTEstimate > 0 ) { + $rtt = $this->mRTTEstimate; + return true; // don't care about $rtt + } + } + + // This will reconnect if possible or return false if not + $this->clearFlag( DBO_TRX, self::REMEMBER_PRIOR ); + $ok = ( $this->query( self::PING_QUERY, __METHOD__, true ) !== false ); + $this->restoreFlags( self::RESTORE_PRIOR ); + + if ( $ok ) { + $rtt = $this->mRTTEstimate; } + + return $ok; } /** * @return bool */ protected function reconnect() { - # Stub. Not essential to override. - return true; + $this->closeConnection(); + $this->mOpened = false; + $this->mConn = false; + try { + $this->open( $this->mServer, $this->mUser, $this->mPassword, $this->mDBname ); + $this->lastPing = microtime( true ); + $ok = true; + } catch ( DBConnectionError $e ) { + $ok = false; + } + + return $ok; } public function getSessionLagStatus() { @@ -2956,7 +3243,7 @@ abstract class DatabaseBase implements IDatabase { } /** - * Get the slave lag when the current transaction started + * Get the replica DB lag when the current transaction started * * This is useful when transactions might use snapshot isolation * (e.g. REPEATABLE-READ in innodb), so the "real" lag of that data @@ -2968,19 +3255,19 @@ abstract class DatabaseBase implements IDatabase { */ public function getTransactionLagStatus() { return $this->mTrxLevel - ? [ 'lag' => $this->mTrxSlaveLag, 'since' => $this->trxTimestamp() ] + ? [ 'lag' => $this->mTrxReplicaLag, 'since' => $this->trxTimestamp() ] : null; } /** - * Get a slave lag estimate for this server + * Get a replica DB lag estimate for this server * * @return array ('lag': seconds or false on error, 'since': UNIX timestamp of estimate) * @since 1.27 */ public function getApproximateLagStatus() { return [ - 'lag' => $this->getLBInfo( 'slave' ) ? $this->getLag() : 0, + 'lag' => $this->getLBInfo( 'replica' ) ? $this->getLag() : 0, 'since' => microtime( true ) ]; } @@ -3286,6 +3573,14 @@ abstract class DatabaseBase implements IDatabase { } public function getScopedLockAndFlush( $lockKey, $fname, $timeout ) { + if ( $this->writesOrCallbacksPending() ) { + // This only flushes transactions to clear snapshots, not to write data + throw new DBUnexpectedError( + $this, + "$fname: Cannot COMMIT to clear snapshot because writes are pending." + ); + } + if ( !$this->lock( $lockKey, $fname, $timeout ) ) { return null; }