protected $serverTags;
/** @var int */
protected $numServers;
+ /** @var int UNIX timestamp */
+ protected $lastGarbageCollect = 0;
/** @var int */
- protected $lastExpireAll = 0;
+ protected $purgePeriod = 10;
/** @var int */
- protected $purgePeriod = 100;
+ protected $purgeLimit = 100;
/** @var int */
protected $shards = 1;
/** @var string */
/** @var array Exceptions */
protected $connFailureErrors = [];
+ /** @var int */
+ const GARBAGE_COLLECT_DELAY_SEC = 1;
+
/**
* Constructor. Parameters are:
* - server: A server info structure in the format required by each
* when a cluster is replicated to another site (with different host names)
* but each server has a corresponding replica in the other cluster.
*
- * - purgePeriod: The average number of object cache requests in between
+ * - purgePeriod: The average number of object cache writes in between
* garbage collection operations, where expired entries
* are removed from the database. Or in other words, the
* reciprocal of the probability of purging on any given
- * request. If this is set to zero, purging will never be
- * done.
+ * write. If this is set to zero, purging will never be done.
+ *
+ * - purgeLimit: Maximum number of rows to purge at once.
*
* - tableName: The table name to use, default is "objectcache".
*
if ( isset( $params['purgePeriod'] ) ) {
$this->purgePeriod = intval( $params['purgePeriod'] );
}
+ if ( isset( $params['purgeLimit'] ) ) {
+ $this->purgeLimit = intval( $params['purgeLimit'] );
+ }
if ( isset( $params['tableName'] ) ) {
$this->tableName = $params['tableName'];
}
return false;
}
- public function getMulti( array $keys, $flags = 0 ) {
+ protected function doGetMulti( array $keys, $flags = 0 ) {
$values = [];
$blobs = $this->fetchBlobMulti( $keys );
return $values;
}
- public function fetchBlobMulti( array $keys, $flags = 0 ) {
+ protected function fetchBlobMulti( array $keys, $flags = 0 ) {
$values = []; // array of (key => value)
$keysByTable = [];
$keysByTable[$serverIndex][$tableName][] = $key;
}
- $this->garbageCollect(); // expire old entries if any
-
$dataRows = [];
foreach ( $keysByTable as $serverIndex => $serverKeys ) {
try {
$keysByTable[$serverIndex][$tableName][] = $key;
}
- $this->garbageCollect(); // expire old entries if any
-
$result = true;
$exptime = (int)$expiry;
+ /** @noinspection PhpUnusedLocalVariableInspection */
$silenceScope = $this->silenceTransactionProfiler();
foreach ( $keysByTable as $serverIndex => $serverKeys ) {
$db = null;
try {
$db = $this->getDB( $serverIndex );
+ $this->occasionallyGarbageCollect( $db );
} catch ( DBError $e ) {
$this->handleWriteError( $e, $db, $serverIndex );
$result = false;
return $result;
}
- public function set( $key, $value, $exptime = 0, $flags = 0 ) {
- $ok = $this->setMulti( [ $key => $value ], $exptime );
+ protected function doSet( $key, $value, $exptime = 0, $flags = 0 ) {
+ $ok = $this->insertMulti( [ $key => $value ], $exptime, $flags, true );
return $ok;
}
protected function cas( $casToken, $key, $value, $exptime = 0, $flags = 0 ) {
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
$db = null;
+ /** @noinspection PhpUnusedLocalVariableInspection */
$silenceScope = $this->silenceTransactionProfiler();
try {
$db = $this->getDB( $serverIndex );
}
public function deleteMulti( array $keys, $flags = 0 ) {
+ return $this->purgeMulti( $keys, $flags );
+ }
+
+ public function purgeMulti( array $keys, $flags = 0 ) {
$keysByTable = [];
foreach ( $keys as $key ) {
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
}
$result = true;
+ /** @noinspection PhpUnusedLocalVariableInspection */
$silenceScope = $this->silenceTransactionProfiler();
foreach ( $keysByTable as $serverIndex => $serverKeys ) {
$db = null;
return $result;
}
- public function delete( $key, $flags = 0 ) {
- $ok = $this->deleteMulti( [ $key ], $flags );
+ protected function doDelete( $key, $flags = 0 ) {
+ $ok = $this->purgeMulti( [ $key ], $flags );
return $ok;
}
public function incr( $key, $step = 1 ) {
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
$db = null;
+ /** @noinspection PhpUnusedLocalVariableInspection */
$silenceScope = $this->silenceTransactionProfiler();
try {
$db = $this->getDB( $serverIndex );
'exptime' => $row->exptime
],
__METHOD__,
- 'IGNORE'
+ [ 'IGNORE' ]
);
if ( $db->affectedRows() == 0 ) {
public function changeTTL( $key, $exptime = 0, $flags = 0 ) {
list( $serverIndex, $tableName ) = $this->getTableByKey( $key );
$db = null;
+ /** @noinspection PhpUnusedLocalVariableInspection */
$silenceScope = $this->silenceTransactionProfiler();
try {
$db = $this->getDB( $serverIndex );
* @return bool
*/
protected function isExpired( $db, $exptime ) {
- return $exptime != $this->getMaxDateTime( $db ) && wfTimestamp( TS_UNIX, $exptime ) < time();
+ return (
+ $exptime != $this->getMaxDateTime( $db ) &&
+ wfTimestamp( TS_UNIX, $exptime ) < time()
+ );
}
/**
}
}
- protected function garbageCollect() {
- if ( !$this->purgePeriod || $this->replicaOnly ) {
- // Disabled
- return;
- }
- // Only purge on one in every $this->purgePeriod requests.
- if ( $this->purgePeriod !== 1 && mt_rand( 0, $this->purgePeriod - 1 ) ) {
- return;
- }
- $now = time();
- // Avoid repeating the delete within a few seconds
- if ( $now > ( $this->lastExpireAll + 1 ) ) {
- $this->lastExpireAll = $now;
- $this->expireAll();
+ /**
+ * @param IDatabase $db
+ * @throws DBError
+ */
+ protected function occasionallyGarbageCollect( IDatabase $db ) {
+ if (
+ // Random purging is enabled
+ $this->purgePeriod &&
+ // This is not using a replica DB
+ !$this->replicaOnly &&
+ // Only purge on one in every $this->purgePeriod writes
+ mt_rand( 0, $this->purgePeriod - 1 ) == 0 &&
+ // Avoid repeating the delete within a few seconds
+ ( time() - $this->lastGarbageCollect ) > self::GARBAGE_COLLECT_DELAY_SEC
+ ) {
+ $garbageCollector = function () use ( $db ) {
+ $this->deleteServerObjectsExpiringBefore( $db, time(), null, $this->purgeLimit );
+ $this->lastGarbageCollect = time();
+ };
+ if ( $this->asyncHandler ) {
+ $this->lastGarbageCollect = time(); // avoid duplicate enqueues
+ ( $this->asyncHandler )( $garbageCollector );
+ } else {
+ $garbageCollector();
+ }
}
}
public function expireAll() {
- $this->deleteObjectsExpiringBefore( wfTimestampNow() );
+ $this->deleteObjectsExpiringBefore( time() );
}
- /**
- * Delete objects from the database which expire before a certain date.
- * @param string $timestamp
- * @param bool|callable $progressCallback
- * @return bool
- */
- public function deleteObjectsExpiringBefore( $timestamp, $progressCallback = false ) {
+ public function deleteObjectsExpiringBefore(
+ $timestamp,
+ callable $progressCallback = null,
+ $limit = INF
+ ) {
+ /** @noinspection PhpUnusedLocalVariableInspection */
$silenceScope = $this->silenceTransactionProfiler();
- for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
+
+ $serverIndexes = range( 0, $this->numServers - 1 );
+ shuffle( $serverIndexes );
+
+ $ok = true;
+
+ $keysDeletedCount = 0;
+ foreach ( $serverIndexes as $numServersDone => $serverIndex ) {
$db = null;
try {
$db = $this->getDB( $serverIndex );
- $dbTimestamp = $db->timestamp( $timestamp );
- $totalSeconds = false;
- $baseConds = [ 'exptime < ' . $db->addQuotes( $dbTimestamp ) ];
- for ( $i = 0; $i < $this->shards; $i++ ) {
- $maxExpTime = false;
- while ( true ) {
- $conds = $baseConds;
- if ( $maxExpTime !== false ) {
- $conds[] = 'exptime >= ' . $db->addQuotes( $maxExpTime );
- }
- $rows = $db->select(
- $this->getTableNameByShard( $i ),
- [ 'keyname', 'exptime' ],
- $conds,
- __METHOD__,
- [ 'LIMIT' => 100, 'ORDER BY' => 'exptime' ] );
- if ( $rows === false || !$rows->numRows() ) {
- break;
- }
- $keys = [];
- $row = $rows->current();
- $minExpTime = $row->exptime;
- if ( $totalSeconds === false ) {
- $totalSeconds = wfTimestamp( TS_UNIX, $timestamp )
- - wfTimestamp( TS_UNIX, $minExpTime );
- }
- foreach ( $rows as $row ) {
- $keys[] = $row->keyname;
- $maxExpTime = $row->exptime;
- }
-
- $db->delete(
- $this->getTableNameByShard( $i ),
- [
- 'exptime >= ' . $db->addQuotes( $minExpTime ),
- 'exptime < ' . $db->addQuotes( $dbTimestamp ),
- 'keyname' => $keys
- ],
- __METHOD__ );
-
- if ( $progressCallback ) {
- if ( intval( $totalSeconds ) === 0 ) {
- $percent = 0;
- } else {
- $remainingSeconds = wfTimestamp( TS_UNIX, $timestamp )
- - wfTimestamp( TS_UNIX, $maxExpTime );
- if ( $remainingSeconds > $totalSeconds ) {
- $totalSeconds = $remainingSeconds;
- }
- $processedSeconds = $totalSeconds - $remainingSeconds;
- $percent = ( $i + $processedSeconds / $totalSeconds )
- / $this->shards * 100;
- }
- $percent = ( $percent / $this->numServers )
- + ( $serverIndex / $this->numServers * 100 );
- call_user_func( $progressCallback, $percent );
- }
- }
- }
+ $this->deleteServerObjectsExpiringBefore(
+ $db,
+ $timestamp,
+ $progressCallback,
+ $limit,
+ $numServersDone,
+ $keysDeletedCount
+ );
} catch ( DBError $e ) {
$this->handleWriteError( $e, $db, $serverIndex );
- return false;
+ $ok = false;
}
}
- return true;
+
+ return $ok;
+ }
+
+ /**
+ * @param IDatabase $db
+ * @param string|int $timestamp
+ * @param callable|null $progressCallback
+ * @param int $limit
+ * @param int $serversDoneCount
+ * @param int &$keysDeletedCount
+ * @throws DBError
+ */
+ private function deleteServerObjectsExpiringBefore(
+ IDatabase $db,
+ $timestamp,
+ $progressCallback,
+ $limit,
+ $serversDoneCount = 0,
+ &$keysDeletedCount = 0
+ ) {
+ $cutoffUnix = wfTimestamp( TS_UNIX, $timestamp );
+ $shardIndexes = range( 0, $this->shards - 1 );
+ shuffle( $shardIndexes );
+
+ foreach ( $shardIndexes as $numShardsDone => $shardIndex ) {
+ $continue = null; // last exptime
+ $lag = null; // purge lag
+ do {
+ $res = $db->select(
+ $this->getTableNameByShard( $shardIndex ),
+ [ 'keyname', 'exptime' ],
+ array_merge(
+ [ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ) ],
+ $continue ? [ 'exptime >= ' . $db->addQuotes( $continue ) ] : []
+ ),
+ __METHOD__,
+ [ 'LIMIT' => min( $limit, 100 ), 'ORDER BY' => 'exptime' ]
+ );
+
+ if ( $res->numRows() ) {
+ $row = $res->current();
+ if ( $lag === null ) {
+ $lag = max( $cutoffUnix - wfTimestamp( TS_UNIX, $row->exptime ), 1 );
+ }
+
+ $keys = [];
+ foreach ( $res as $row ) {
+ $keys[] = $row->keyname;
+ $continue = $row->exptime;
+ }
+
+ $db->delete(
+ $this->getTableNameByShard( $shardIndex ),
+ [
+ 'exptime < ' . $db->addQuotes( $db->timestamp( $cutoffUnix ) ),
+ 'keyname' => $keys
+ ],
+ __METHOD__
+ );
+ $keysDeletedCount += $db->affectedRows();
+ }
+
+ if ( is_callable( $progressCallback ) ) {
+ if ( $lag ) {
+ $remainingLag = $cutoffUnix - wfTimestamp( TS_UNIX, $continue );
+ $processedLag = max( $lag - $remainingLag, 0 );
+ $doneRatio = ( $numShardsDone + $processedLag / $lag ) / $this->shards;
+ } else {
+ $doneRatio = 1;
+ }
+
+ $overallRatio = ( $doneRatio / $this->numServers )
+ + ( $serversDoneCount / $this->numServers );
+ call_user_func( $progressCallback, $overallRatio * 100 );
+ }
+ } while ( $res->numRows() && $keysDeletedCount < $limit );
+ }
}
/**
* @return bool
*/
public function deleteAll() {
+ /** @noinspection PhpUnusedLocalVariableInspection */
$silenceScope = $this->silenceTransactionProfiler();
for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) {
$db = null;
return true;
}
+ public function lock( $key, $timeout = 6, $expiry = 6, $rclass = '' ) {
+ // Avoid deadlocks and allow lock reentry if specified
+ if ( isset( $this->locks[$key] ) ) {
+ if ( $rclass != '' && $this->locks[$key]['class'] === $rclass ) {
+ ++$this->locks[$key]['depth'];
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ list( $serverIndex ) = $this->getTableByKey( $key );
+
+ $db = null;
+ try {
+ $db = $this->getDB( $serverIndex );
+ $ok = $db->lock( $key, __METHOD__, $timeout );
+ if ( $ok ) {
+ $this->locks[$key] = [ 'class' => $rclass, 'depth' => 1 ];
+ }
+
+ $this->logger->warning(
+ __METHOD__ . " failed due to timeout for {key}.",
+ [ 'key' => $key, 'timeout' => $timeout ]
+ );
+
+ return $ok;
+ } catch ( DBError $e ) {
+ $this->handleWriteError( $e, $db, $serverIndex );
+ $ok = false;
+ }
+
+ return $ok;
+ }
+
+ public function unlock( $key ) {
+ if ( !isset( $this->locks[$key] ) ) {
+ return false;
+ }
+
+ if ( --$this->locks[$key]['depth'] <= 0 ) {
+ unset( $this->locks[$key] );
+
+ list( $serverIndex ) = $this->getTableByKey( $key );
+
+ $db = null;
+ try {
+ $db = $this->getDB( $serverIndex );
+ $ok = $db->unlock( $key, __METHOD__ );
+ if ( !$ok ) {
+ $this->logger->warning(
+ __METHOD__ . ' failed to release lock for {key}.',
+ [ 'key' => $key ]
+ );
+ }
+ } catch ( DBError $e ) {
+ $this->handleWriteError( $e, $db, $serverIndex );
+ $ok = false;
+ }
+ } else {
+ $ok = false;
+ }
+
+ return $ok;
+ }
+
/**
* Serialize an object and, if possible, compress the representation.
* On typical message and page data, this can provide a 3X decrease
* in storage requirements.
*
- * @param mixed &$data
+ * @param mixed $data
* @return string
*/
- protected function serialize( &$data ) {
+ protected function serialize( $data ) {
$serial = serialize( $data );
if ( function_exists( 'gzdeflate' ) ) {
* @param int $serverIndex
* @throws Exception
*/
- protected function handleWriteError( DBError $exception, IDatabase $db = null, $serverIndex ) {
- if ( !$db ) {
+ protected function handleWriteError( DBError $exception, $db, $serverIndex ) {
+ if ( !( $db instanceof IDatabase ) ) {
$this->markServerDown( $exception, $serverIndex );
}